What does Onyx offer?
In this chapter, I’ll enumerate and explain the reasons why we built Onyx.
An Information Model
Information models are often superior to APIs, and almost always better than DSLs. The hyper-flexibility of a data structure literal allows Onyx workflows and catalogs to be constructed at a distance, meaning on another machine, in a different language, by another program, etc.
The information model for an Onyx workflow has the distinct advantage that it’s possible to compile other workflow representations (perhaps a datalog or SQL query) into the workflow that Onyx understands. The Information Model chapter describes a target for data structure compilation.
Temporal Decoupling
To the extent that Onyx places data at the highest importance, very few Onyx constructs actually need to be generated at the same time as program deployment or peer registration. Programs can create workflows, drop them to a database, and pull them out at a later time without any problems.
Elimination of Macros
Macros are a tremendously powerful tool, but are often inappropriate for
end-user consumption of an API. Onyx goes beyond Storm’s defbolt
and
defspout
by making vanilla Clojure functions shine. These functions
need no context to execute and do not require any dynamic bindings. They
receive all information that they need via parameters, which are
injected by Onyx’s task lifecycles.
Plain Clojure Functions
To the same point above, we want plain Clojure functions to be the building blocks for application logic. Onyx’s functions can be tested directly without any special test runner.
Testing Without Mocking
In general, your design is in trouble when you’ve reached for
with-redefs
or something along those lines to mock functions. Onyx
places a high importance around programming against interfaces, and even
more-so around putting space in-between small components with channels.
Onyx programs can be tested in development mode, and moved to production
mode with only a small configuration file change. If you’d like to
change your input or output plugins, all you need to do is re-associate
the catalog entry with something like an in-memory plugin. No interface
mocking code required.
Easy Parameterization of Workflows
It’s particularly telling that many compute frameworks don’t offer an easy way to parameterize workflows. Onyx puts space between the caller and the function definition. Parameterize tasks inside the catalog, and update the catalog entry at will. Additionally, Onyx allows peers to spin up their own parameters at boot-up time.
Transparent Code Reuse for Batch and Streaming
Onyx uses the notion of a sentinel value to transparently switch between streaming and batching modes. This makes it really easy to be able to reuse the same code for both batch and streaming computations.
Aspect Orientation
Clojure functions again serve as a huge win. Dire is a library that supports aspects, meaning you can keep your application logic airtight away from logging, preconditions, and error handling.
AOT Nothing
Onyx AOT’s absolutely nothing on your behalf. When you’re ready to stand your jar up, simply uberjar and start executing on the target machine. Hadoop and Storm cause dependency hell (In Storm’s case, you’re restricted to a particular version of Clojure because you’re locked in by the Executor) by providing their own dependencies on top of yours. Onyx won’t mess with your dependencies.
You can, however, AOT Onyx yourself to speed up compilation times.
Concepts
We’ll take a quick overview of some terms you’ll see in the rest of this user guide.
Terminology
Segment
A segment is the unit of data in Onyx, and it’s represented by a Clojure map. Segments represent the data flowing through the cluster. Segments are the only shape of data that Onyx allows you to emit between functions.
Task
A task is the smallest unit of work in Onyx. It represents an activity of either input, processing, or output.
Workflow
A workflow is the structural specification of an Onyx program. Its purpose is to articulate the paths that data flows through the cluster at runtime. It is specified via a directed, acyclic graph.
The workflow representation is a Clojure vector of vectors. Each inner vector contains exactly two elements, which are keywords. The keywords represent nodes in the graph, and the vector represents a directed edge from the first node to the second.
;;; in
;;; |
;;; increment
;;; |
;;; output
[[:in :increment] [:increment :out]]
;;; input
;;; /\
;;; processing-1 processing-2
;;; | |
;;; output-1 output-2
[[:input :processing-1]
[:input :processing-2]
[:processing-1 :output-1]
[:processing-2 :output-2]]
;;; input
;;; /\
;;; processing-1 processing-2
;;; \ /
;;; output
[[:input :processing-1]
[:input :processing-2]
[:processing-1 :output]
[:processing-2 :output]]
Tip
|
Example projects: flat-workflow, multi-output-workflow |
Catalog
All inputs, outputs, and functions in a workflow must be described via a catalog. A catalog is a vector of maps, strikingly similar to Datomic’s schema. Configuration and docstrings are described in the catalog.
Example:
[{:onyx/name :in
:onyx/plugin :onyx.plugin.core-async/input
:onyx/type :input
:onyx/medium :core.async
:onyx/batch-size batch-size
:onyx/max-peers 1
:onyx/doc "Reads segments from a core.async channel"}
{:onyx/name :inc
:onyx/fn :onyx.peer.min-peers-test/my-inc
:onyx/type :function
:onyx/batch-size batch-size}
{:onyx/name :out
:onyx/plugin :onyx.plugin.core-async/output
:onyx/type :output
:onyx/medium :core.async
:onyx/batch-size batch-size
:onyx/max-peers 1
:onyx/doc "Writes segments to a core.async channel"}]
Flow Conditions
In contrast to a workflow, flow conditions specify on a segment-by-segment basis which direction data should flow determined by predicate functions. This is helpful for conditionally processing a segment based off of its content.
Example:
[{:flow/from :input-stream
:flow/to [:process-adults]
:flow/predicate :my.ns/adult?
:flow/doc "Emits segment if this segment is an adult."}
Function
A function is a construct that receives segments and emits segments for further processing. It literally is a Clojure function.
Lifecycle
A lifecycle is a construct that describes the lifetime of a task. There is an entire chapter devoted to lifecycles, but to be brief, a lifecycle allows you to hook in and execute arbitrary code at critical points during a task. A lifecycle carries a context map that you can merge results back into for use later.
Windows
Windows are a construct that partitions a possible unbounded sequence of data into finite pieces, allowing aggregations to be specified. This lets you treat an infinite sequence of data as if it were finite over a given period of time.
Plugin
A plugin is a means for hooking into data sources to extract data as input and produce data as output. Onyx comes with a few plugins, but you can craft your own, too.
Sentinel
A sentinel is a value that can be pushed into Onyx to signal the end of
a stream of data. This effectively lets Onyx switch between streaming
and batching mode. The sentinel in Onyx is represented by the Clojure
keyword :done
.
Peer
A Peer is a node in the cluster responsible for processing data. A single "peer" refers to a physical machine, though we often use the terms peer and virtual peer interchangeably when the difference doesn’t matter.
Architecture & Low Level Design
This chapter outlines how Onyx works on the inside to meet the required properties of a distributed data processing system. This is not a formal proof nor an iron-clad specification for other implementations of Onyx. I will do my best to be transparent about how everything is working under the hood - good and bad. If something doesn’t make sense, keep moving. There are inevitable forward references.
High Level Components
Peer
A Peer is a node in the cluster responsible for processing data. A peer generally refers to a physical machine as its typical to only run one peer per machine.
Virtual Peer
A Virtual Peer refers to a single concurent worker running on a single physical machine. Each virtual peer spawns a small number threads since it uses asynchronous messaging. All virtual peers are equal, whether they are on the same physical machine or not. Virtual peers communicate segments directly to one another, and coordinate strictly via the log in ZooKeeper.
ZooKeeper
Apache ZooKeeper is used as both storage and communication layer. ZooKeeper takes care of things like CAS, consensus, and atomic counting. ZooKeeper watches are at the heart of how Onyx virtual peers detect machine failure.
Aeron
Aeron is the primary messaging transport layer. The transport layer is pluggable, though we don’t support any other transports at this time since Aeron can "short-circuit" networking and act as fast as core.async.
The Log
This design centers around a totally ordered sequence of commands using
a log structure. The log acts as an immutable history and arbiter. It’s
maintained through ZooKeeper using a directory of persistent, sequential
znodes. Virtual peers act as processes that consume from the log. At the
time a peer starts up, it initializes its local replica to the "empty
state". Log entries represent deterministic, idempotent functions to be
applied to the local replica. The peer plays the log from the beginning,
applying each log entry to its local replica. The local replica is
transformed from one value to another. As a reaction to each replica
transformation, the peer may send more commands to the tail of the log.
Peers may play the log at any rate. After each peer has played k
log
entries, all peers at time k
will have exactly the same local
replica. Peers store everything in memory - so if a peer fails, it
simply reboots from scratch with a new identifier and plays the log
forward from the beginning. Since it has a new identifier, it has no
association to the commands it previously issued; this prevents live
lock issues.
The Inbox and Outbox
Every peer maintains its own inbox and output. Messages received appear in order on the inbox, and messages to-be-sent are placed in order on the outbox.
Messages arrive in the inbox as commands are proposed into the ZooKeeper log. Technically, the inbox need only be size 1 since all log entries are processed strictly in order. As an optimization, the peer can choose to read a few extra commands behind the one it’s currently processing. In practice, the inbox will probably be configured with a size greater than one.
The outbox is used to send commands to the log. Certain commands processed by the peer will generate other commands. For example, if a peer is idle and it receives a command notifying it about a new job, the peer will react by sending a command to the log requesting that it be allocated for work. Each peer can choose to pause or resume the sending of its outbox messages. This is useful when the peer is just acquiring membership to the cluster. It will have to play log commands to join the cluster fully, but it cannot volunteer to be allocated for work since it’s not officially yet a member of the cluster.
Applying Log Entries
This section describes how log entries are applied to the peer’s local
replica. A log entry is a persistent, sequential znode. Its content is a
map with keys :fn
and :args
. :fn
is mapped to a keyword that finds
this log entry’s implementation. :args
is mapped to another map with
any data needed to apply the log entry to the replica.
Peers begin with the empty state value, and local state. Local state maintains a mapping of things like the inbox and outbox - things that are specific to this peer, and presumably can’t be serialized as EDN.
Each peer starts a thread that listens for additions to the log. When
the log gets a new entry, the peer calls
onyx.extensions/apply-log-entry
. This is a function that takes a log
entry and the replica, and returns a new replica with the log entry
applied to it. This is a value-to-value transformation.

A single peer begins with the empty replica ({}
) and progressively
applies log entries to the replica, advancing its state from one
immutable value to the next.

A peer reads the first log entry and applies the function to its local replica, moving the replica into a state "as of" entry 0

Because application of functions from the log against the replica are deterministic and free of side effects, peers do not need to coordinate about the speed that each plays the log. Peers read the log on completely independent timelines
Peers effect change in the world by reacting to log entries. When a log
entry is applied, the peer calls onyx.extensions/replica-diff
, passing
it the old and new replicas. The peer produces a value summarizing what
changed. This diff is used in subsequent sections to decide how to react
and what side-effects to carry out.
Next, the peer calls onyx.extensions/reactions
on the old/new
replicas, the diff, and its local state. The peer can decide to submit
new entries back to the log as a reaction to the log entry it just saw.
It might react to "submit-job" with "volunteer-for-task", for instance.

After a peer reads a log entry and applies it to the log replica, it will (deterministically!) react by appending zero or more log entries to the tail of the log.
Finally, the peer can carry out side-effects by invoking
onyx.extensions/fire-side-effects!
. This function will do things like
talking to ZooKeeper or writing to core.async channels. Isolating side
effects means that a subset of the test suite can operate on pure
functions alone. Each peer is tagged with a unique ID, and it looks for
this ID in changes to its replica. The ID acts very much like the object
orientated "this", in that it uses the ID to differentiate itself to
conditionally perform side effects across an otherwise uniformly
behaving distributed system.
Joining the Cluster
Aside from the log structure and any strictly data/storage centric znodes, ZooKeeper maintains another directory for pulses. Each peer registers exactly one ephemeral node in the pulses directory. The name of this znode is a UUID.
3-Phase Cluster Join Strategy
When a peer wishes to join the cluster, it must engage in a 3 phase protocol. Three phases are required because the peer that is joining needs to coordinate with another peer to change its ZooKeeper watch. I call this process "stitching" a peer into the cluster.
The technique needs peers to play by the following rules: - Every peer must be watched by another peer in ZooKeeper, unless there is exactly one peer in the cluster - in which case there are no watches. - When a peer joins the cluster, all peers must form a "ring" in terms of who-watches-who. This makes failure repair very easy because peers can transitively close any gaps in the ring after machine failure. - As a peer joining the cluster begins playing the log, it must buffer all reactive messages unless otherwise specified. The buffered messages are flushed after the peer has fully joined the cluster. This is because a peer could volunteer to perform work, but later abort its attempt to join the cluster, and therefore not be able to carry out any work. - A peer picks another peer to watch by determining a candidate list of peers it can stitch into. This candidate list is sorted by peer ID. The target peer is chosen by taking the message id modulo the number of peers in the sorted candidate list. The peer chosen can’t be random because all peers will play the message to select a peer to stitch with, and they must all determine the same peer. Hence, the message modulo piece is a sort of "random seed" trick.

At monotonic clock value t = 42, the replica has the above :pairs
key, indicates who watches whom. As nodes are added, they maintain a
ring formation so that every peer is watched by another.
The algorithm works as follows:
-
let S = the peer to stitch into the cluster
-
S sends a
prepare-join-cluster
command to the log, indicating its peer ID -
S plays the log forward
-
Eventually, all peers encounter
prepare-join-cluster
message that was sent by it -
if the cluster size is
0
: -
S instantly becomes part of the cluster
-
S flushes its outbox of commands
-
if the cluster size (
n
) is>= 1
: -
let Q = this peer playing the log entry
-
let A = the set of all peers in the fully joined in the cluster
-
let X = the single peer paired with no one (case only when
n = 1
) -
let P = set of all peers prepared to join the cluster
-
let D = set of all peers in A that are depended on by a peer in P
-
let V = sorted vector of
(set-difference (set-union A X) D)
by peer ID -
if V is empty:
-
S sends an
abort-join-cluster
command to the log -
when S encounters
abort-join-cluster
, it backs off and tries to join again later
-
-
let T = nth in V of
message-id mod (count V)
-
let W = the peer that T watches
-
T adds a watch to S
-
T sends a
notify-join-cluster
command to the log, notifying S that it is watched, adding S to P -
when S encounters
notify-join-cluster
:-
it adds a watch to W
-
it sends a
accept-join-cluster
command, removing S from P, adding S to A
-
-
when
accept-join-cluster
has been encountered, this peer is part of the cluster -
S flushes its outbox of commands
-
T drops its watch from W - it is now redundant, as S is watching W

Peers 1 - 4 form a ring. Peer 5 wants to join. Continued below…

Peer 5 initiates the first phase of the join protocol. Peer 1 prepares to accept Peer 5 into the ring by adding a watch to it. Continued below…

Peer 5 initiates the second phase of the join protocol. Peer 5 notifies Peer 4 as a peer to watch. At this point, a stable "mini ring" has been stitched along the outside of the cluster. We note that the link between Peer 1 and 4 is extraneous. Continued below…

Peer 5 has been fully stitched into the cluster, and the ring is intact
Dead peer removal
Peers will fail, or be shut down purposefully. Onyx needs to: - detect the downed peer - inform all peers that this peer is no longer executing its task - inform all peers that this peer is no longer part of the cluster
Peer Failure Detection Strategy
In a cluster of > 1 peer, when a peer dies another peer will have a watch registered on its znode to detect the ephemeral disconnect. When a peer fails (peer F), the peer watching the failed peer (peer W) needs to inform the cluster about the failure, and go watch the node that the failed node was watching (peer Z). The joining strategy that has been outlined forces peers to form a ring. A ring structure has an advantage because there is no coordination or contention as to who must now watch peer Z for failure. Peer W is responsible for watching Z, because W was watching F, and F was watching Z. Therefore, W transitively closes the ring, and W watches Z. All replicas can deterministically compute this answer without conferring with each other.

The nodes form a typical ring pattern. Peer 5 dies, and its connection with ZooKeeper is severed. Peer 1 reacts by reporting Peer 5’s death to the log. Continued below…

At t = 45, all of the replicas realize that Peer 5 is dead, and that Peer 1 is responsible for closing the gap by now watching Peer 4 to maintain the ring.

One edge case of this design is the simultaneous death of two or more consecutive peers in the ring. Suppose Peers 4 and 5 die at the exact same time. Peer 1 will signal Peer 5’s death, but Peer 5 never got the chance to signal Peer 4’s death. Continued below…

Peer 1 signals Peer 5’s death, and closes to the ring by adding a watch to Peer 4. Peer 4 is dead, but no one yet knows that. We circumvent this problem by first determining whether a peer is dead or not before adding a watch to it. If it’s dead, as is Peer 4 in this case, we report it and further close the ring. Continued below…

Peer 1 signals peer 4’s death, and further closes to the ring by adding a watch to Peer 3. The ring is now fully intact.
Peer Failure Detection Thread
There is a window of time (inbetween when a peer prepares to join the cluster and when its monitoring peer notifies the cluster of its presence) that the monitoring node may fail, effectively deadlocking the new peer. This can occur because a peer will check if its monitoring dead is dead during the prepare phase - essentially performing eviction on a totally dead cluster - and may find a false positive that a node is alive when it is actually dead. The root issue is that ephemeral znodes stick around for a short period of time after the creating process goes down. The new peer must watch its monitor until it delivers the second phase message for joining - notification. When this occurs, we can stop monitoring, because the monitoring node is clearly alive. If the znode is deleted because the process exited, we can safely effect it and free the peer from deadlocking. Issue 416 found this bug, and offers more context about the specific problem that we encountered.
Messaging
The messaging layer of Onyx employees the same technique that Apache Storm uses to achieve fault tolerance. Any errors are our own.
The Algorithm
Onyx guarantees that each segment read from an input task will be processed, and provide at-least-once delivery semantics. Every segment that comes off an input task is given a UUID to track it through its lifetime. It is also given a peer ID that it uses as an "acking daemon", explained in more detail below. The segment also receives an initial "ack val". The ack val is a random 64-bit integer. Each time a segment is successfully processed at each task, this ack-val is XOR’ed with itself. Further, any new segments that are generated as a result of this segment being completed are given random ack vals, too. These ack vals are also XOR’ed against the previous XOR value. When no new segments are generated, the result of XOR’ing all the segment ack vals returns 0. Finding 0 means that the segment has been successfully processed throughout the entire workflow.
Acking Daemon
An acking daemon is a process that runs alongside each peer and maintains state. This state is a map of segment ID to another map. The map in the value maintains the current "ack val" and the peer to send completion messages to. When the ack val for a segment is set to zero, a message is send to the appropriate peer to release the message from durable storage. This concludes the processing of the segment, and it is considered successful. Key/value pairs are periodically reaped if peers that are operating on these segments are lost. If these values are reaped, the message is automatically replayed from the root of the workflow on the input task on a rolling basis.
We can depict all of this visually:

Phases of Execution
A batch of segments runs through the following phases of execution in sequential order:
-
Inject resources: Initiates any values for this particular batch
-
Read message batch: reads and decompresses messages from the transport layer
-
Tag messages: If these are messages from an input task, uniquely tags each segment to track it through its lifetime
-
Timeout pool: If these are messages from an input task, adds these messages to a timeout pool to automatically expire on a preconfigured schedule
-
Completion: Checks if this job has been completed, and shuts down the job if so
-
Strip sentinel: Removes the sentinel if it’s in this batch, signal that the job may be completed
-
Apply function: Apply fns to batches of segments
-
Build new segments: Creates and IDs new segments based on the received segments
-
Write message batch: Writes messages to the next peer or output medium
-
Flow retry: Cause messages back at the input task to play again that are force-retried via flow conditions.
-
Ack messages: Acknowledges the segments that have been processed to the acking daemon
-
Close batch resources: Closes any resources opened for this specific batch
Garbage collection
One of the primary obstacles that this design imposes is the requirement of seemingly infinite storage. Log entries are only ever appended - never mutated. If left running long enough, ZooKeeper will run out of space. Similarly, if enough jobs are submitted and either completed or killed, the in memory replica that each peer houses will grow too large. Onyx requires a garbage collector to be periodically invoked.
When the garbage collector is invoked, two things will happen. The caller of gc will place an entry onto the log. As each peer processed this log entry, it carries out a deterministic, pure function to shrink the replica. The second thing will occur when each peer invokes the side effects for this log entry. The caller will have specified a unique ID such that it is the only one that is allowed to trim the log. The caller will take the current replica (log entry N to this log entry), and store it in an "origin" znode. Anytime that a peer boots up, it first reads out of the origin location. Finally, the caller deletes log entry N to this log entry minus 1. This has the dual effect of making new peers start up faster, as they have less of the log to play. They begin in a "hot" state.
The garbage collector can be invoked by the public API function
onyx.api/gc
. Upon returning, the log will be trimmed, and the in
memory replicas will be compressed.

A peer can start by reading out of the origin, and continue directly to a particular log location.
Command Reference
-
Submitter: peer (P) that wants to join the cluster
-
Purpose: determines which peer (Q) that will watch P. If P is the only peer, it instantly fully joins the cluster
-
Arguments: P’s ID
-
Replica update: assoc
{Q P}
to:prepare
key. If P is the only peer, P is immediately added to the:peers
key, and no further reactions are taken -
Side effects: Q adds a ZooKeeper watch to P’s pulse node
-
Reactions: Q sends
notify-join-cluster
to the log, with args P and R (R being the peer Q watches currently)
-
Submitter: peer Q helping to stitch peer P into the cluster
-
Purpose: Adds a watch from P to R, where R is the node watched by Q
-
Arguments: P and R’s ids
-
Replica update: assoc
{Q P}
to:accept
key, dissoc{Q P}
from:prepare
key -
Side effects: P adds a ZooKeeper watch to R’s pulse node
-
Reactions: P sends
accept-join-cluster
to the log, with args P, Q, and R
-
Submitter: peer P wants to join the cluster
-
Purpose: confirms that P can safely join, Q can drop its watch from R, since P now watches R, and Q watches P
-
Arguments: P, Q, and R’s ids
-
Replica update: dissoc
{Q P}
from:accept
key, merge{Q P}
and{P R}
into:pairs
key, conj P onto the:peers
key -
Side effects: Q drops its ZooKeeper watch from R
-
Reactions: peer P flushes its outbox of messages
-
Submitter: virtual peer P wants to become active in the cluster
-
Purpose: P affirms that it’s peer group has been safely stitched into the cluster
-
Arguments: P’s id
-
Replica update: conj P into
:peers
, remove from:orphaned-peers
-
Side effects: All virtual peers configure their workload and possibly start new tasks
-
Reactions: none
-
Submitter: peer (Q) determines that peer (P) cannot join the cluster (P may = Q)
-
Purpose: Aborts P’s attempt at joining the cluster, erases attempt from replica
-
Arguments: P’s id
-
Replica update: Remove any
:prepared
or:accepted
entries where P is a key’s value -
Side effects: P optionally backs off for a period
-
Reactions: P optionally sends
:prepare-join-cluster
to the log and tries again
-
Submitter: peer (Q) reporting that peer P is dead
-
Purpose: removes P from
:prepared
,:accepted
,:pairs
, and/or:peers
, transitions Q’s watch to R (the node P watches) and transitively closes the ring -
Arguments: peer ID of P
-
Replica update: assoc
{Q R}
into the:pairs
key, dissoc{P R}
-
Side effects: Q adds a ZooKeeper watch to R’s pulse node
-
Submitter: virtual peer P is leaving the cluster
-
Purpose: removes P from its task and consideration of any future tasks
-
Arguments: peer ID of P
-
Replica update: removes P from
:peers
-
Side effects: All virtual peers reconfigure their workloads for possibly new tasks
-
Submitter: peer (P), who has seen the leader sentinel
-
Purpose: P wants to propagate the sentinel to all downstream tasks
-
Arguments: P’s ID (
:id
), the job ID (:job
), and the task ID (:task
) -
Replica update: If this peer is allowed to seal, updates
:sealing-task
with the task ID associated this peers ID. -
Side effects: Puts the sentinel value onto the queue
-
Reactions: None
-
Submitter: Client, via public facing API
-
Purpose: Send a catalog and workflow to be scheduled for execution by the cluster
-
Arguments: The job ID (
:id
), the task scheduler for this job (:task-scheduler
), a topologically sorted sequence of tasks (:tasks
), the catalog (:catalog
), and the saturation level for this job (:saturation
). Saturation denotes the number of peers this job can use, at most. This is typically Infinity, unless all catalog entries set:onyx/max-peers
to an integer value. Saturation is then the sum of those numbers, since it creates an upper bound on the total number of peers that can be allocated to this task. -
Replica update:
-
Side effects: None
-
Reactions: If the job scheduler dictates that this peer should be reallocated to this job or another job, sends
:volunteer-for-task
to the log
-
Submitter: Client, via public facing API
-
Purpose: Stop all peers currently working on this job, and never allow this job’s tasks to be scheduled for execution again
-
Arguments: The job ID (
:job
) -
Replica update: Adds this job id to
:killed-jobs
vector, removes any peers in:allocations
for this job’s tasks. Switches the:peer-state
for all peer’s executing a task for this job to:idle
. -
Side effects: If this peer is executing a task for this job, stops the current task lifecycle
-
Reactions: If this peer is executing a task for this job, reacts with
:volunteer-for-task
-
Submitter: Client, via public facing API
-
Purpose: Compress all peer local replicas and trim old log entries in ZooKeeper.
-
Arguments: The caller ID (
:id
) -
Replica update: Clears out all data in all keys about completed and killed jobs - as if they never existed.
-
Side effects: Deletes all log entries before this command’s entry, creates a compressed replica at a special origin log location, and updates to the pointer to the origin
-
Reactions: None
-
Submitter: peer (P), who has successfully started its incoming buffer
-
Purpose: Indicates that this peer is ready to receive segments as input
-
Replica update: Updates
:peer-state
under the:id
of this peer to set its state to:active
. -
Side effects: If this task should immediately be sealed, seals this task
-
Reactions: None.
-
Submitter: This is a special entry that should never be appended to the log
-
Purpose: Perform a hard reset of the replica, replacing its entire value. This is useful if a log subscriber is reading behind a garbage collection call and tries to read a non-existent entry. The new origin can be found and its value applied locally via the subscriber.
-
Replica update: Replaces the entire value of the replica with a new value
-
Side effects: None.
-
Reactions: None.
-
Submitter: Peer that performs log initialization.
-
Purpose: Sets an identifier to the ledger that will track the state represented by this log.
-
Replica update: Updates
:state-logs
with the id. -
Side effects: None.
-
Reactions: None.
APIs
Onyx ships with two distinct APIs to accommodate different needs. A description of each follows.
Core API
The
Core
API is used to start/stop resources, jobs, and monitor job progress.
It’s accessible through the onyx.api
namespace.
start-env
Starts a development environment with in-memory ZooKeeper. Helpful for developing locally without needing to start any other services.
start-peer-group
Starts a resource pool to be shared across a group of peers. You should only start one peer group per physical machine.
start-peers
Starts N virtual peers to execute tasks. In a production environment, you should start by booting up N virtual peers for N cores on the physical machine. Tune performance from there.
submit-job
Submits a job to Onyx to be scheduled for execution. Takes a map with
keys :catalog
, :workflow
, :flow-conditions
, :windows
,
:triggers
, :metadata
, and :task-scheduler
. Returns a map of
:job-id
and :task-ids
, which map to a UUID and vector of maps
respectively. :metadata
is a map of values that must serialize to EDN.
:metadata
will be logged with all task output, and is useful for
identifying a particular task based on something other than its name or
ID.
Additionally, :metadata
may optionally contain a :job-id
key. When
specified, this key will be used for the job ID instead of a randomly
chosen UUID. Repeated submissions of a job with the same :job-id will be
treated as an idempotent action. If a job with the same ID has been
submitted more than once, the original task IDs associated with the
catalog will be returned, and the job will not run again, even if it has
been killed or completed. It is undefined behavior to submit two jobs
with the same :job-id metadata whose :workflow, :catalog,
:flow-conditions, etc are not equal.
await-job-completion
Given a job ID, blocks the calling thread until all the tasks for this job have been completed.
Tip
|
Example project: block-on-job-completion |
gc
Invokes the garbage collector. Compresses the replica in Zookeeper, freeing up storage and deleting log history. Frees up memory on the local, in memory replica on all peers.
kill-job
Stops this job from executing, never allowing it to be run again.
Tip
|
Example project: kill-job |
subscribe-to-log
Sends all events in the log to a core.async channel. Events are received in the order that they appeared in the log. Starts from the beginning of the log, blocking until more entries are available.
Peer Pipeline API
The Peer Pipeline API allows you to interact with data storage mediums to read and write data for plugins.
seal-resource
Called by one peer exactly once (subsequent calls occur if the sealing peer fails) when the task is completing. Close out target output resources.
Functions
This section outlines how Onyx programs execute behavior. Onyx uses plain Clojure functions to carry out distributed activity. You have the option of performing grouping and aggregation on each function.
Functional Transformation
A Function is a construct that takes a segment as a parameter and outputs a segment or a seq of segments. Functions are meant to literally transform a single unit of data in a functional manner. The following is an example of a function:
(defn my-inc [{:keys [n] :as segment}]
(assoc segment :n (inc n)))
Note that you may only pass segments between functions - no other shape of data is allowed.
Tip
|
Example project: filtering |
Function Parameterization
A function can be parameterized before a job is submitted to Onyx. The segment is always the last argument to the function. There are multiple ways to paramerize a function, and they can be used in combination.
-
Via the catalog
:onyx/params
entry
(def catalog
{...
:my/param-1 "abc"
:my/param-2 "def"
:onyx/params [:my/param-1 :my/param-2]
...}
The function is then invoked with (partial f "abc" "def")
. The order
is controlled by the vector of :onyx/params
.
-
Via
:onyx.core/params
in thebefore-task-start
lifecycle hook
(defn before-task-start-hook [event lifecycle]
{:onyx.core/params [42]})
The function is then invoked with (partial f 42)
.
Using this approach "hard sets" the parameters list. Other parameters
may already exist in onyx.core/params
. If you want to retain those
parameter, concat them together and return the new value on
onyx.core/params
.
-
Via the
:onyx.peer/fn-params
peer configuration
(def peer-opts
{...
:onyx.peer/fn-params {:my-fn-name [64]}})
The function is then invoked with (partial f 64)
.
This approach is useful for parameterizing a task regardless of which
job it is in. If both onyx.peer/fn-params
and :onyx/params
are set
for the same task, they are concatenated together, with fn-params
coming first.
Tip
|
Example projects: parameterized, interface-injection, catalog-parameters |
Grouping & Aggregation
Grouping ensures that "like" values are always routed to the same
virtual peer, presumably to compute an aggregate. Grouping is specified
inside of a catalog entry. There are two ways to group: by key of
segment, or by arbitrary function. Grouping by key is a convenience that
will reach into each segment and pin all segments with the same key
value in the segment together. Grouping functions receive a single
segment as input. The output of a grouping function is the value to
group on. Grouped functions must set keys :onyx/min-peers
and
:onyx/flux-policy
. See below for a description of these.
Group By Key
To group by a key or a vector of keys in a segment, use
:onyx/group-by-key
in the catalog entry:
{:onyx/name :sum-balance
:onyx/fn :onyx.peer.kw-grouping-test/sum-balance
:onyx/type :function
:onyx/group-by-key :name
:onyx/min-peers 3
:onyx/flux-policy :continue
:onyx/batch-size 1000}
Group By Function
To group by an arbitrary function, use :onyx/group-by-fn
in the
catalog entry:
{:onyx/name :sum-balance
:onyx/fn :onyx.peer.fn-grouping-test/sum-balance
:onyx/type :function
:onyx/group-by-fn :onyx.peer.fn-grouping-test/group-by-name
:onyx/min-peers 3
:onyx/flux-policy :continue
:onyx/batch-size 1000}
Flux Policies
Functions that use the grouping feature are presumably stateful. For
this reason, unless :continue
is used, once a job begins, no matter
how many peers are added to the cluster, no new peers will be allocated
to grouping tasks. When more peers are added after the job begins, the
hashing algorithm loses its consistency, and stateful operations won’t
work correctly.
Given the fact the Onyx will not add more peers to regular grouping
tasks after it begins, we introduce a new parameter - :onyx/min-peers
.
This should be set to an integer that indicates the minimum number of
peers that will be allocated to this task before the job can begin. Onyx
may schedule more than the minimum number that you set. You can create
an upper bound by also using :onyx/max-peers
.
Tip
|
Example project: max-peers. |
One concern that immediately needs to be handled is addressing what happens if a peer on a grouping task leaves the cluster after the job has begun? Clearly, removing a peer from a grouping task also breaks the consistent hashing algorithm that supports statefulness. The policy that is enforced is configurable, and must be chosen by the developer. We offer two policies, outlined below.
Continue Policy
When :onyx/flux-policy
is set to :continue
on a catalog entry, the
hashing algorithm may be inconsistent. Peers can leave or join a task at
any point in time. This is desirable for streaming jobs where the data
is theoretically infinite or have tasks that benefit from grouping but
are not stateful.
Bulk Functions
Sometimes you might be able to perform a function more efficiently over
a batch of segments rather than processing one segment at a time, such
as writing segments to a database in a non-output task. You can receive
the entire batch of segments in bulk as an argument to your task by
setting :onyx/bulk?
to true
in your catalog entry for your function.
Onyx will ignore the output of your function and pass the same
segments that you received downstream. The utility of this feature is
that you receive the entire batch in one shot. Onyx ignores your output
because it would make it impossible to track which specific messages are
children of particular upstream messages - breaking Onyx’s fault
tolerance mechanism.
Functions with this key enabled may not be used with flow conditions. These segments are passed to all immediate downstream tasks.
An example catalog entry:
{:onyx/name :inc
:onyx/fn :onyx.peer.batch-function-test/my-inc
:onyx/type :function
:onyx/bulk? true
:onyx/batch-size batch-size}
And an example catalog function to correspond to this entry:
(defn my-inc [segments]
(prn segments)
:ignored-return-value)
The default value for this option is false
.
Leaf Functions
Sometimes you’re going to want a node in your workflow with no outgoing
connections that doesn’t perform I/O against a database. You can do this
by setting :onyx/type
to :output
, :onyx/medium
to :function
, and
:onyx/plugin
to onyx.peer.function/function
. Then you can specify an
:onyx/fn
pointing to a regular Clojure function. For example:
{:onyx/name :leaf-task
:onyx/fn ::add-to-results
:onyx/plugin :onyx.peer.function/function
:onyx/medium :function
:onyx/type :output
:onyx/batch-size 20}
Flow Conditions
This section covers flow conditions. Flow conditions are used for isolating logic about whether or not segments should pass through different tasks in a workflow, and support a rich degree of composition with runtime parameterization.
Summary
Workflows specify the structure of your computation as a directed, acyclic graph. A workflow describes all possible routes that a segment can take as it enters your workflow. On the other hand, we often have the need to specify how an individual segment moves throughout your workflow. Many times, a segment conditionally moves from one task to another. This is a concept that Onyx takes apart and turns into its own idea, independent of the rest of your computation. They’re called Flow Conditions. It should be mentioned straight away that Flow Conditions are entirely optional, and your program can ignore them entirely if you’d like. Omitting them leads to the default behavior, which sends a segment to all immediate downstream tasks.
Motivating Example
The easiest way to learn how to use flow conditions is to see an example. Suppose we have the following workflow snippet:
[[:input-stream :process-children]
[:input-stream :process-adults]
[:input-stream :process-female-athletes]
[:input-stream :process-everyone]
...]
This workflow takes some input in (presumably a stream of people), and
directs segments to four possible tasks - :process-children
,
:process-adults
, :process-female-athletes
, and :process-everyone
.
Suppose we want to conditionally direct a segment to zero or more of
these tasks, depending on some predicates. We use flow conditions to
carry out this work. Flow conditions are their own data structure that
are bundled along with the workflow and catalog to onyx.api/submit-job
(with key :flow-conditions
). Here’s an example of what a flow
conditions data structure would look like for our proposed workflow:
[{:flow/from :input-stream
:flow/to [:process-children]
:my/max-child-age 17
:flow/predicate [:my.ns/child? :my/max-child-age]
:flow/doc "Emits segment if this segment is a child."}
{:flow/from :input-stream
:flow/to [:process-adults]
:flow/predicate :my.ns/adult?
:flow/doc "Emits segment if this segment is an adult."}
{:flow/from :input-stream
:flow/to [:process-female-athletes]
:flow/predicate [:and :my.ns/female? :my.ns/athlete?]
:flow/doc "Emits segment if this segment is a female athlete."}
{:flow/from :input-stream
:flow/to [:process-everyone]
:flow/predicate :my.ns/constantly-true
:flow/doc "Always emit this segment"}]
The basic idea is that every entry in the Flow Conditions data structure
denotes a relationship between a task and its downstream tasks.
:flow/from
indicates the task that the segment is leaving, and
:flow/to
indicates the tasks that the segment should be sent to if the
predicate evaluates to true. The predicate is denoted by
:flow/predicate
, which is a keyword or sequence of keywords that are
resolved to a function. Later in this section, we’ll cover how exactly
the predicate function is constructed.
There is one flow conditions data structure per job - that is, there is one vector of maps. The order that you specify the flow conditions in matters. More on that later in this section.
Tip
|
Example project: flow-combine |
Predicate Function Signatures
A predicate function is a Clojure function that takes at least four parameters - a context map, the old segment, the new segment, and the collection of all new segments produced from the old segment. Predicates can take parameters at runtime. They will be appended to the end of the function invocation. See Predicate Parameters for further discussion of how to use runtime parameters.
Predicates for the above examples can be seen below:
(defn child? [event old-segment new-segment all-new max-age]
(<= (:age new-segment) max-age))
(defn adult? [event old-segment new-segment all-new]
(>= (:age new-segment) 18))
(defn female? [event old-segment new-segment all-new]
(= (:gender new-segment) "Female"))
(defn athlete? [event old-segment new-segment all-new]
(= (:job new-segment) "athlete"))
(def constantly-true (constantly true))
Predicate Parameters
Predicate functions can take parameters at runtime. In this first flow
condition, we use the parameter :my/max-child-age
and set its value to
17
. We pass this value to the predicate by surrounding it with
brackets, as in: [:my.ns/child? :my/max-child-age]
. The parameters are
appended to the end of the function call to the predicate. See Predicate
Function Signatures in this section to see the arguments that are passed
into the predicate regardless each invocation.
Key Exclusion
Sometimes, the decision of whether to allow a segment to pass through to
the next task depends on some side effects that were a result of the
original segment transformation. Onyx allows you to handle this case by
adding extra keys to your segment that comes out of the transformation
function. These extra keys are visible in your predicate function, and
then stripped off before being sent to the next task. You can indicate
these "extra keys" by the setting :onyx/exclude-keys
to a vector of
keys.
For example, if we had the following transformation function:
(defn my-function [x]
(assoc x :result 42 :side-effects-result :blah))
Our predicate for flow conditions might need to use the
:side-effects-result
to make a decision. We don’t want to actually
send that information over out to the next task, though - so we
:flow/exclude-keys
on :side-effects-results
to make it disappear
after the predicate result has been realized.
{:flow/from :input-stream
:flow/to [:process-adults]
:flow/predicate :my.ns/adult?
:flow/exclude-keys [:side-effects-result]
:flow/doc "Emits segment if this segment is an adult."}
Tip
|
Example project: flow-exclude-keys |
Predicate Composition
One extraordinarily powerful feature of Flow Conditions is
its composition characteristics. Predicates can be composed
with logical and
, or
, and not
. We use composition to check
if the segment is both female and an athlete in
[:and :my.ns/female? :my.ns/athlete?]
. Logical function calls
must be surrounded with brackets, and may be nested arbitrarily.
Functions inside of logical operator calls may be parameterized,
as in [:and :my.ns/female? [:my.ns/child? :my/max-child-age]]
.
Parameters may not specify logical functions.
Tip
|
Example project: flow-predicate-composition |
Match All/None
Sometimes, you want a flow condition that emits a value to all tasks if the predicate is true. You can use short hand to emit to all downstream tasks:
{:flow/from :input-stream
:flow/to :all
:flow/short-circuit? true
:flow/predicate :my.ns/adult?}
Similarly, sometimes you want to emit to no downstream tasks:
{:flow/from :input-stream
:flow/to :none
:flow/short-circuit? true
:flow/predicate :my.ns/adult?}
If a flow condition specifies :all
as its :flow/to
, it must come
before any other flow conditions. If a flow condition specifies :none
as its :flow/to
, it must come directly behind an :all
condition, or
first if there is no :all
condition. This is because of the semantics
of short circuiting. We’ll discuss what short circuiting means later in
this section.
:flow/to
set to :all
or :none
must always set
:flow/short-circuit?
to true
.
:flow/from
may be set to :all
. This directs all immediate upstream
links to pass segments to this task’s flow condition. :flow/from
as
:all
does not impose order constraints as :flow/to
set to :all
does.
Short Circuiting
If multiple flow condition entries evaluate to a true predicate, their
:flow/to
values are unioned (duplicates aren’t acknowledged), as well
as their :flow/exclude-keys
. Sometimes you don’t want this behavior,
and you want to specify exactly the downstream tasks to emit to - and
not check any more flow condition entries. You can do this with
:flow/short-circuit?
set to true
. Any entry that has
:flow/short-circuit?
set to true
must come before any entries for an
task that have it set to false
or nil
.
Tip
|
Example project: flow-short-circuit |
Exceptions
Flow Conditions give you leverage for handling exceptions without miring
your code in try
/catch
logic. If an exception is thrown from an Onyx
transformation function, you can capture it from within your flow
conditions by setting :flow/thrown-exception?
to true
. It’s default
value is false
. If an exception is thrown, only flow conditions with
:flow/thrown-exception?
set to true
will be evaluated. The value
that is normally the segment which is sent to the predicate will be the
exception object that was thrown. Exception flow conditions must have
:flow/short-circuit?
set to true
. Note that exceptions don’t
serialize. This feature is meant to be used in conjunction with
Post-transformations and Actions for sending exception values to
downstream tasks.
{:flow/from :input-stream
:flow/to [:error-task]
:flow/short-circuit? true
:flow/thrown-exception? true
:flow/predicate :my.ns/handle-error?}
And the predicate might be:
(defn handle-error? [event old ex-obj all-new]
(= (type ex-obj) java.lang.NullPointerException))
This will only restrict the flow from :input-stream
to :error-task
when an exception is thrown - see the discussion of Short Circuiting
above. When an exception is not thrown, the default behaviour will
apply. For example, if there are later flow conditions, they will apply.
If not will flow through to all tasks if there are no other flow
conditions for that task.
Post-transform
Post-transformations are extension provided to handle segments that
cause exceptions to arise. If a flow condition has
:flow/thrown-exception?
set to true
, it can also set
:flow/post-transform
to a keyword. This keyword must have the value of
a fully namespace qualified function on the classpath. This function
will be invoked with three parameters: the event map, the segment that
caused the exception, and the exception object. The result of this
function, which must be a segment, will be passed to the downstream
tasks. This allows you to come up with a reasonable value to pass
downstream when you encounter an exception, since exceptions don’t
serialize anyway. :flow/exclude-keys
will be called on the resulting
transformed segment.
Example:
{:flow/from :input-stream
:flow/to [:error-task]
:flow/short-circuit? true
:flow/thrown-exception? true
:flow/post-transform :my.ns/post-transform
:flow/predicate :my.ns/handle-error?}
And an example post-transform function might be:
(defn post-transform [event segment exception-obj]
{:error :my-exception-value})
Actions
After a set of flow conditions has been evaluated for a segment, you usually want to send the segment downstream to the next set of tasks. Other times, you want to retry to process the segment because something went wrong. Perhaps a database connection wasn’t available, or an email couldn’t be sent.
Onyx provides Flow Conditions :flow/action
to accomplish this. By
setting :flow/action
to :retry
, a segment will expire from the
internal pool of pending messages and be automatically retried from its
input task. If any of the :flow/action`s from the matching flow
conditions are `:retry
, the segment will be retried and will not be
sent downstream. This parameter is optional, and it’s default value is
nil
. nil
will cause the segment to be sent to all downstream tasks
that were selected from evaluating the flow conditions. Any flow
condition clauses with :flow/action
set to :retry
must also have
:flow/short-circuit?
set to true
, and :flow/to
set to :none
.
Here’s a quick example:
[{:flow/from :input-stream
:flow/to :none
:flow/short-circuit? true
:flow/predicate :my.ns/adult?
:flow/action :retry}
{:flow/from :input-stream
:flow/to [:task-a]
:flow/predicate :my.ns/child?}]
Messaging
Background
The messaging layer takes care of the direct peer to peer transfer of segment batches, acks, segment completion and segment retries to the relevant virtual peers.
Messaging Implementations
The Onyx messaging implementation is pluggable and alternative
implementations can be selected via the :onyx.messaging/impl
Peer Configuration option.
Aeron Messaging
Owing to Aeron’s high throughput and low latency, Aeron is the default Onyx messaging implementation. There are a few relevant considerations when using the Aeron implementation.
Subscription (Connection) Multiplexing
One issue when scaling Onyx to a many node cluster is that every virtual peer may require a communications channel to any other virtual peer. As a result, a naive implementation will require up to m*m connections over the cluster, where m is the number of virtual peers. By sharing Aeron subscribers between virtual peers on a node, this can be reduced to n*n connections, where n is the number of nodes. This reduces the amount of overhead required to maintain connections between peers, allowing the cluster to scale better as the number of nodes to increase.
It is worth noting that Aeron subscribers (receivers) must also
generally perform deserialization. Therefore, subscribers may become
CPU bound by the amount of deserializaton work that needs to be
performed. In order to reduce this effect, multiple subscribers can be
instantiated per node. This can be tuned via
:onyx.messaging.aeron/subscriber-count
in Peer
Configuration. As increasing the number of subscribers may lead back
to an undesirable growth in the number of connections between nodes,
each node will only choose one subscription to communicate through.
The choice of subscriber is calculated via a hash of the combined IPs
of the communicating nodes, in order to consistently spread the use of
subscribers over the cluster.
Clusters which perform a large proportion of the time serializing
should consider increasing the subscriber count. As a general guide,
cores = virtual peers + subscribers
.
Connection Short Circuiting
When virtual peers are co-located on the same node, messaging will bypass the use of Aeron and directly communicate the message without any use of the network and without any serialization. Therefore, performance benchmarks performed on a single node can be very misleading.
The Peer Configuration option,
:onyx.messaging/allow-short-circuit?
is provided for the purposes of
more realistic performance testing on a single node.
Port Use
The Aeron messaging implementation will use the port configured via
:onyx.messaging/peer-port
. This UDP port must be unfirewalled.
Media Driver
Aeron requires a media driver to be used on each node. Onyx provides
an embedded media driver for local testing, however use of the
embedded driver is not recommended in production. The embedded driver
can be configured via the :onyx.messaging.aeron/embedded-driver?
Peer Configuration option.
When using Aeron messaging in production, a media driver should be created in another java process. You can do this via the following code snippet, or by using the Aeron distribution.
(ns your-app.aeron-media-driver
(:require [clojure.core.async :refer [chan <!!]])
(:import [uk.co.real_logic.aeron Aeron$Context]
[uk.co.real_logic.aeron.driver MediaDriver MediaDriver$Context ThreadingMode]))
(defn -main [& args]
(let [ctx (doto (MediaDriver$Context.))
media-driver (MediaDriver/launch ctx)]
(println "Launched the Media Driver. Blocking forever...")
(<!! (chan))))
Configuration Options
Aeron is independently configurable via Java properties (e.g.
JAVA_OPTS="-Daeron.mtu.length=16384"
). Configuration of these may
cause different performance characteristics, and certain options may
need to be configured in order to communicate large segments between
peers.
Documentation for these configuration options can be found in Aeron’s documentation.
Lifecycles
Lifecycles are a feature that allow you to control code that executes at particular points during task execution on each peer. Lifecycles are data driven and composable.
Summary
There are several interesting points to execute arbitrary code during a task in Onyx. Onyx lets you plug in and calls functions before a task, after a task, before a batch, and after a batch on every peer. Additionally, there is another lifecycle hook that allows you to delay starting a task in case you need to do some work like acquiring a lock under contention. A peer’s lifecycle is isolated to itself, and lifecycles never coordinate across peers. Usage of lifecycles are entirely optional. Lifecycle data is submitted as a data structure at job submission time.
Lifecycle Phases
Before task set up
A function that takes two arguments - an event map, and the matching lifecycle map. Must return a boolean value indicating whether to start the task or not. If false, the process backs off for a preconfigured amount of time and calls this task again. Useful for lock acquisition. This function is called prior to any processes inside the task becoming active.
Before task execution
A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called after processes in the task are launched, but before the peer listens for incoming segments from other peers.
Before segment batch start
A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called prior to receiving a batch of segments from the reading function.
After read segment batch
A function that takes two arguments - an event map, and the matching
lifecycle map. Must return a map that is merged back into the original
event map. This function is called immediately after a batch of segments
has been read by the peer. The segments are available in the event map
by the key :onyx.core/batch
.
After segment batch start
A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called after all messages have been written and acknowledged.
After task execution
A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called before the peer relinquishes its task. No more segments will be received.
After ack message
A function that takes four arguments - an event map, a message id, the return of an input plugin ack-segment call, and the matching lifecycle map. May return a value of any type which will be discarded. This function is called whenever a segment at the input task has been fully acked.
After retry message
A function that takes four arguments - an event map, a message id, the return of an input plugin ack-segment call, and the matching lifecycle map. May return a value of any type which will be discarded. This function is called whenever a segment at the input task has been pending for greater than pending-timeout time and will be retried.
Handle Exception
If an exception is thrown during any lifecycle execution except
after-task-stop
, one or more lifecycle handlers may be defined. If
present, the exception will be caught and passed to this function. See
the details on the
Onyx
cheat sheet.
Example
Let’s work with an example to show how lifecycles work. Suppose you want to print out a message at all the possible lifecycle hooks. You’d start by defining 9 functions for the 9 hooks:
(ns my.ns)
(defn start-task? [event lifecycle]
(println "Executing once before the task starts.")
true)
(defn before-task-start [event lifecycle]
(println "Executing once before the task starts.")
{})
(defn after-task-stop [event lifecycle]
(println "Executing once after the task is over.")
{})
(defn before-batch [event lifecycle]
(println "Executing once before each batch.")
{})
(defn after-read-batch [event lifecycle]
(printn "Executing once after this batch has been read.")
{})
(defn after-batch [event lifecycle]
(println "Executing once after each batch.")
{})
(defn after-ack-segment [event message-id rets lifecycle]
(println "Message " message-id " is fully acked"))
(defn after-retry-segment [event message-id rets lifecycle]
(println "Retrying message " message-id))
(defn handle-exception [event lifecycle lifecycle-phase e]
(println "Caught exception: " e)
(println "Returning :restart, indicating that this task should restart.")
:restart)
Notice that all lifecycle functions return maps except start-task?
.
This map is merged back into the event
parameter that you received.
start-task?
is a boolean function that allows you to block and back
off if you don’t want to start the task quite yet. This function will be
called periodically as long as false
is returned. If more than one
start-task?
is specified in your lifecycles, they must all return
true
for the task to begin. start-task?
is invoked before
before-task-start
.
Next, define a map that wires all these functions together by mapping predefined keywords to the functions:
(def calls
{:lifecycle/start-task? start-task?
:lifecycle/before-task-start before-task-start
:lifecycle/before-batch before-batch
:lifecycle/after-read-batch after-read-batch
:lifecycle/after-batch after-batch
:lifecycle/after-task-stop after-task-stop
:lifecycle/after-ack-segment after-ack-segment
:lifecycle/after-retry-segment after-retry-segment
:lifecycle/handle-exception handle-exception})
Each of these 9 keys maps to a function. All of these keys are optional, so you can mix and match depending on which functions you actually need to use.
Finally, create a lifecycle data structure by pointing
:lifecycle/calls
to the fully qualified namespaced keyword that
represents the calls map that we just defined. Pass it to your
onyx.api/submit-job
call:
(def lifecycles
[{:lifecycle/task :my-task-name-here
:lifecycle/calls :my.ns/calls
:lifecycle/doc "Test lifecycles and print a message at each stage"}])
(onyx.api/submit-job
peer-config
{
...
:lifecycles lifecycles
...
}
It is also possible to have a lifecycle apply to every task in a
workflow by specifying :lifecycle/task :all
. This is useful for
instrumenting your tasks with metrics, error handling, or debugging
information.
(def lifecycles
[{:lifecycle/task :all
:lifecycle/calls :my.ns/add-metrics
:lifecycle/doc "Instruments all tasks in a workflow with the example function 'add-metrics'"}])
You can supply as many sets of lifecycles as you want. They are invoked in the order that they are supplied in the vector, giving you a predictable sequence of calls. Be sure that all the keyword symbols and functions are required onto the classpath for the peer that will be executing them.
Tip
|
Example project: lifecycles |
Backpressure
A common problem with streaming platforms is whether peers and tasks can exhibit backpressure to upstream tasks.
When Onyx’s internal messaging buffers overflow, the oldest segments in the buffer are discarded i.e. a sliding buffer. While this ensures that the freshest segments are likely to make it through the entire workflow, and be fully acked, input segments are still likely to be retried.
One important form of backpressure, is the
:onyx/max-pending
(Information Model), task parameter, which may be
configured on input tasks. Input tasks will only produce new segments if there
are fewer than max-pending pending (i.e. not fully acked) input segments.
One problem with max-pending as a form of backpressure, is that it doesn’t take into account the number of segments produced by the intermediate tasks, nor whether these segments are filling up the inbound buffers of the later tasks (due to slow processing, large numbers of produced segments, etc).
Onyx uses a simple scheme to allow backpressure to push back to upstream input tasks. When a virtual peer fills up past a high water mark, the peer writes a log message to say that its internal buffers are filling up (backpressure-on). If any peer currently allocated to a job has set backpressure-on, then all peers allocated to input tasks will stop reading from the input sources.
When a peers messaging buffer is reduced to below a low water mark, it writes a backpressure-off log message. If no peers allocated to a job are currently set to backpressure, then peers allocated to input tasks will resume reading from their input sources.
Refer to Peer Config for information regarding the default backpressure settings, and how to override them.
Windowing and Aggregation
This section discusses a feature called windowing. Windows allow you to group and accrue data into possibly overlapping buckets. Windows are intimately related to the Triggers feature. When you’re finished reading this section, head over to the Triggers chapter next.
Summary
Windowing splits up a possibly unbounded data set into finite, possibly
overlapping portions. Windows allow us create aggregations over distinct
portions of a stream, rather than stalling and waiting for the entire
data data set to arrive. In Onyx, Windows strictly describe how data is
accrued. When you want to do something with the windowed data, you use
a Trigger. See the chapter on Triggers for more information. Onyx’s
windowing mechanisms are strong enough to handle stream disorder. If
your data arrives in an order that isn’t "logical" (for example,
:event-time
keys moving backwards in time), Onyx can sort out the
appropriate buckets to put the data in.
Window Types
The topic of windows has been widely explored in the literature. There are different types of windows. Currently, Onyx supports Fixed, Sliding, Global, and Session windows. We will now explain the supported window types.
Fixed Windows
Fixed windows, sometimes called Tumbling windows, span a particular
range and do not slide. That is, fixed windows never overlap one
another. Consequently, a data point will fall into exactly one instance
of a window (often called an extent in the literature). As it turns
out, fixed windows are a special case of sliding windows where the range
and slide values are equal. You can see a visual below of how this
works, where the |--|
drawings represent extents. Each window is of
range 5
. Time runs horizontally, while the right-hand side features
the extent bound running vertically. The first extent captures all
values between 0 and 4.99999…
1, 5, 10, 15, 20, 25, 30, 35, 40
|--| [0 - 4]
|--| [5 - 9]
|---| [10 - 14]
|---| [15 - 19]
|---| [20 - 24]
|---| [25 - 29]
|---| [30 - 34]
|---| [35 - 39]
Example:
{:window/id :collect-segments
:window/task :identity
:window/type :fixed
:window/aggregation :onyx.windowing.aggregation/count
:window/window-key :event-time
:window/range [5 :minutes]}
Tip
|
Example project: fixed-windows |
Sliding Windows
In contrast to fixed windows, sliding windows allow extents to overlap.
When a sliding window is specified, we have to give it a range for which
the window spans, and a slide value for how long to wait between
spawning a new window extent. Every data point will fall into exactly
range / slide
number of window extents. We draw out what this looks
like for a sliding window with range 15
and slide 5
:
1, 5, 10, 15, 20, 25, 30, 35, 40
|---------| [0 - 14]
|----------| [5 - 19]
|-----------| [10 - 24]
|-----------| [15 - 29]
|-----------| [20 - 34]
|-----------| [25 - 39]
Example:
{:window/id :collect-segments
:window/task :identity
:window/type :sliding
:window/aggregation :onyx.windowing.aggregation/conj
:window/window-key :event-time
:window/range [15 :minutes]
:window/slide [5 :minute]}
Tip
|
Example project: sliding-windows |
Global Windows
Global windows are perhaps the easiest to understand. With global windows, there is exactly one window extent that match all data that enters it. This lets you capture events that span over an entire domain of time. Global windows are useful for modeling batch or timeless computations.
<- Negative Infinity Positive Infinity ->
|-------------------------------------------------------|
Example:
{:window/id :collect-segments
:window/task :identity
:window/type :global
:window/aggregation :onyx.windowing.aggregation/count
:window/window-key :event-time}]
Tip
|
Example project: global-windows |
Session Windows
Session windows are windows that dynamically resize their upper and lower bounds in reaction to incoming data. Sessions capture a time span of activity for a specific key, such as a user ID. If no activity occurs within a timeout gap, the session closes. If an event occurs within the bounds of a session, the window size is fused with the new event, and the session is extended by its timeout gap either in the forward or backward direction.
For example, if events with the same session key occured at 5
, 7
,
and 20
, and the session window used a timeout gap of 5, the windows
would look like the following:
1, 5, 10, 15, 20, 25, 30, 35, 40
|-| [5 - 7]
| [20 - 20]
Windows that aren’t fused to anything are single points in time (see
20
). If an event occurs before or after its timeout gap on the
timeline, the two events fuse, as 5
, and 7
do.
Example:
{:window/id :collect-segments
:window/task :identity
:window/type :session
:window/aggregation :onyx.windowing.aggregation/conj
:window/window-key :event-time
:window/session-key :id
:window/timeout-gap [5 :minutes]}]
Tip
|
Example project: session-windows |
Units
Onyx allows you to specify range and slide values in different magnitudes of units, so long as the units can be coverted to the same unit in the end. For example, you can specify the range in minutes, and the slide in seconds. Any value that requires units takes a vector of two elements. The first element represents the value, and the second the unit. For example, window specifications denoting range and slide might look like:
{:window/range [1 :minute]
:window/slide [30 :seconds]}
See the information model for all supported units. You can use a
singular form (e.g. :minute
) instead of the plural (e.g. :minutes
)
where it makes sense for readability.
Onyx is also capable of sliding by :elements
. This is often referred
to as "slide-by-tuple" in research. Onyx doesn’t require a time-based
range and slide value. Any totally ordered value will work equivalently.
Aggregation
Windows allow you accrete data over time. Sometimes, you want to store
all the data. Othertimes you want to incrementally compact the data.
Window specifications must provide a :window/aggregation
key. We’ll go
over an example of every type of aggregation that Onyx supports.
:onyx.windowing.aggregation/conj
The :conj
aggregation is the simplest. It collects segments for this
window and retains them in a vector, unchanged.
{:window/id :collect-segments
:window/task :identity
:window/type :sliding
:window/aggregation :onyx.windowing.aggregation/conj
:window/window-key :event-time
:window/range [30 :minutes]
:window/slide [5 :minutes]
:window/doc "Collects segments on a 30 minute window sliding every 5 minutes"}
:onyx.windowing.aggregation/count
The :onyx.windowing.aggregation/count
operation counts the number of
segments in the window.
{:window/id :count-segments
:window/task :identity
:window/type :fixed
:window/aggregation :onyx.windowing.aggregation/count
:window/window-key :event-time
:window/range [1 :hour]
:window/doc "Counts segments in one hour fixed windows"}
:onyx.windowing.aggregation/sum
The :sum
operation adds the values of :age
for all segments in the
window.
{:window/id :sum-ages
:window/task :identity
:window/type :fixed
:window/aggregation [:onyx.windowing.aggregation/sum :age]
:window/window-key :event-time
:window/range [1 :hour]
:window/doc "Adds the :age key in all segments in 1 hour fixed windows"}
:onyx.windowing.aggregation/min
The :min
operation retains the minimum value found for :age
. An
initial value must be supplied via :window/init
.
{:window/id :min-age
:window/task :identity
:window/type :fixed
:window/aggregation [:onyx.windowing.aggregation/min :age]
:window/init 100
:window/window-key :event-time
:window/range [30 :minutes]
:window/doc "Finds the minimum :age in 30 minute fixed windows, default is 100"}
:onyx.windowing.aggregation/max
The :max
operation retains the maximum value found for :age
. An
initial value must be supplied via :window/init
.
{:window/id :max-age
:window/task :identity
:window/type :fixed
:window/aggregation [:onyx.windowing.aggregation/max :age]
:window/init 0
:window/window-key :event-time
:window/range [30 :minutes]
:window/doc "Finds the maximum :age in 30 minute fixed windows, default is 0"}
:onyx.windowing.aggregation/average
The :average
operation maintains an average over :age
. The state is
maintained as a map with three keys - :n
, the number of elements, :sum
, the running sum,
and :average
, the running average.
{:window/id :average-age
:window/task :identity
:window/type :fixed
:window/aggregation [:onyx.windowing.aggregation/average :age]
:window/window-key :event-time
:window/range [30 :minutes]
:window/doc "Finds the average :age in 30 minute fixed windows, default is 0"}
:onyx.windowing.aggregation/collect-by-key
The :collect-by-key
operation maintains a collection of all segments
with a common key.
{:window/id :collect-members
:window/task :identity
:window/type :fixed
:window/aggregation [:onyx.windowing.aggregation/collect-by-key :team]
:window/window-key :event-time
:window/range [30 :minutes]
:window/doc "Collects all users on the same :team in 30 minute fixed windows"}
Grouping
All of the above aggregates have slightly different behavior when
:onyx/group-by-key
or :onyx/group-by-fn
are specified on the catalog
entry. Instead of the maintaining a scalar value in the aggregate, Onyx
maintains a map. The keys of the map are the grouped values, and values
of the map are normal scalar aggregates.
For example, if you had the catalog entry set to :onyx/group-by-key
with value :name
, and you used a window aggregate of
:onyx.windowing.aggregation/count
, and you sent through segments
[{:name "john"} {:name "tiffany"} {:name "john"}]
, the aggregate map
would look like {"john" 2 "tiffany" 1}
.
Window Specification
See the Information Model chapter for an exact specification of what values the Window maps need to supply. Here we will describe what each of the keys mean.
key name | description |
---|---|
|
A unique identifier per window |
|
The workflow task over which the window operates |
|
Which type of window this is (fixed, sliding, etc) |
|
The aggregation function to apply, as described above. If this operation is over a key, this is a vector, with the second element being the key. |
|
The key over which the range will be calculated |
|
The span of the window |
|
The delay to wait to start a new window after the previous window |
|
The initial value required for some types of aggregation |
|
A strict mininum value that |
|
An optional docstring explaining the window’s purpose |
Triggers
In this section, we talk about Triggers. Triggers are a feature that interact with Windows. Windows capture and bucket data over time. Triggers let you release the captured data over a variety stimuli.
Summary
Windows capture data over time and place segments into discrete, possibly overlapping buckets. By itself, this is a relatively useless concept. In order to harness the information that has been captured and rolled up, we need to move it somewhere. Triggers let us interact with the state in each extent of a window.
Tip
|
Example project: aggregation |
Trigger Types
Onyx ships a number of trigger implementations that can be used out of the box. Each trigger fires in response to a particular stimulous. All triggers implemented in Onyx core fire at task completion. We outline each here and show an example of each in action.
:timer
This trigger sleeps for a duration of :trigger/period
. When it is done
sleeping, the :trigger/sync
function is invoked with its usual
arguments. The trigger goes back to sleep and repeats itself.
{:trigger/window-id :collect-segments
:trigger/refinement :onyx.refinements/discarding
:trigger/on :onyx.triggers/timer
:trigger/period [3 :seconds]
:trigger/sync ::write-to-dynamo
:trigger/doc "Writes state to DynamoDB every 5 seconds, discarding intermediate state"}
:segment
Trigger wakes up in reaction to a new segment being processed. Trigger
only fires once every :trigger/threshold
segments. When the threshold
is exceeded, the count of new segments goes back to 0
, and the looping
proceeds again in the same manner.
{:trigger/window-id :collect-segments
:trigger/refinement :onyx.refinements/accumulating
:trigger/on :onyx.triggers/segment
:trigger/fire-all-extents? true
:trigger/threshold [5 :elements]
:trigger/sync ::write-to-stdout
:trigger/doc "Writes the window contents to stdout every 5 segments"}
:punctuation
Trigger wakes up in reaction to a new segment being processed. Trigger
only fires if :trigger/pred
evaluates to true
. The signature of
:trigger/pred
is of arity-2: trigger, state-event
. Punctuation
triggers are often useful to send signals through that indicate that
no more data will be coming through for a particular window of time.
{:trigger/window-id :collect-segments
:trigger/refinement :onyx.refinements/discarding
:trigger/on :onyx.triggers/punctuation
:trigger/pred ::trigger-pred
:trigger/sync ::write-to-stdout
:trigger/doc "Writes the window contents to std out if :trigger/pred is true for this segment"}
:watermark
Trigger wakes up in reaction to a new segment being processed. Trigger
only fires if the value of :window/window-key
in the segment exceeds
the upper-bound in the extent of an active window. This is a shortcut
function for a punctuation trigger that fires when any piece of data
has a time-based window key that is above another extent, effectively
declaring that no more data for earlier windows will be arriving.
{:trigger/window-id :collect-segments
:trigger/refinement :onyx.refinements/discarding
:trigger/on :onyx.triggers/watermark
:trigger/sync ::write-to-stdout
:trigger/doc "Writes the window contents to stdout when this window's watermark has been exceeded"}
:percentile-watermark
Trigger wakes up in reaction to a new segment being processed. Trigger
only fires if the value of :window/window-key
in the segment exceeds
the lower-bound plus the percentage of the range as indicated by
:trigger/watermark-percentage
, a double
greater than 0
and less
than 1
. This is an alternative to :watermark
that allows you to
trigger on most of the data arriving, not necessarily every last
bit.
{:trigger/window-id :collect-segments
:trigger/refinement :onyx.refinements/discarding
:trigger/on :onyx.triggers/percentile-watermark
:trigger/watermark-percentage 0.95
:trigger/sync ::write-to-stdout
:trigger/doc "Writes the window contents to stdout when this window's watermark is exceeded by 95% of its range"}
Refinement Modes
A refinement mode allows you to articulate what should happen to the state of a window extent after a trigger has been invoked.
Syncing
Onyx offers you the ultimate flexibility on what to do with your state
during a trigger invocation. Set :trigger/sync
to a fully qualified,
namespaced keyword pointing to a function on the classpath at runtime.
This function takes 5 arguments: The
event
map, the
window
map that this trigger is defined on, the
trigger
map, a
state-event
map, and the window state as an immutable value. Its return value is
ignored.
This function is invoked when the trigger fires, and is used to do any arbitrary action with the window contents, such as sync them to a database. It is called once per window instance. In other words, if a fixed window exists with 5 instances, the firing of a Timer trigger will call the sync function 5 times. You can use lifecycles to supply any stateful connections necessary to sync your data. Supplied values from lifecycles will be available through the first parameter - the event map.
Trigger Specification
See the Information Model chapter for an exact specification of what values the Trigger maps need to supply. Here we will describe what each of the keys mean.
key name | description |
---|---|
|
A |
|
Fully qualified namespaced keyword for the mode
of refinement e.g. |
|
Fully qualified namespaced keyword for the trigger
called to determine whether to fire as a reaction e.g.
|
|
Fully qualified namespaced keyword of a function to call with the state. |
|
When true, fires every extent of a window in response to a trigger. |
|
An optional docstring explaining the trigger’s purpose. |
Aggregation & State Management
This section discusses state management and fault tolerance used in windowing/streaming joins.
Summary
Onyx provides the ability to perform updates to a state machine for segments which are calculated over windows. For example, a grouping task may accumulate incoming values for a number of keys over windows of 5 minutes. This feature is commonly used for aggregations, such as summing values, though it can be used to build more complex state machines.
State Example
;; Task definition
{:onyx/name :sum-all-ages
:onyx/fn :clojure.core/identity
:onyx/type :function
:onyx/group-by-key :name
:onyx/flux-policy :recover
:onyx/min-peers 2
:onyx/batch-size 20}
;; Window definition
{:window/id :sum-all-ages-window
:window/task :sum-all-ages
:window/type :global
:window/aggregation [:your-sum-ns/sum :age]
:window/window-key :event-time
:window/range [1 :hour]
:window/doc "Adds the :age key in all segments in 1 hour fixed windows"}
As segments are processed, an internal state within the calculated window is updated. In this case we are trying to sum the ages of the incoming segments.
Window aggregations are defined by a map containing the following keys:
Key | Optional? | Description |
---|---|---|
|
true |
Fn (window) to initialise the state. |
|
false |
Fn (window, state, segment) to generate a serializable state machine update. |
|
false |
Fn (window, state, entry) to apply state machine update entry to a state. |
|
true |
Fn (window, state-1, state-2) to combine two states in the case of two windows being merged. |
In the :window/aggregation
map in the :sum-all-ages
window
referenced above.
(ns your-sum-ns)
(defn sum-init-fn [window]
0)
(defn sum-aggregation-fn [window state segment]
; k is :age
(let [k (second (:window/aggregation window))]
[:set-value (+ state (get segment k))]))
(defn sum-application-fn [window state [changelog-type value]]
(case changelog-type
:set-value value))
;; sum aggregation referenced in window definition.
(def sum
{:aggregation/init sum-init-fn
:aggregation/create-state-update sum-aggregation-fn
:aggregation/apply-state-update sum-application-fn})
Let’s try processing some example segments using this aggregation:
[{:name "John" :age 49}
{:name "Madeline" :age 55}
{:name "Geoffrey" :age 14}]
Results in the following events:
Action | Result |
---|---|
Initial state |
|
Incoming segment |
|
Changelog entry |
|
Applied to state |
|
Incoming segment |
|
Changelog entry |
|
Applied to state |
|
Incoming segment |
|
Changelog entry |
|
Applied to state |
|
This state can be emitted via triggers or another mechanism. By
describing changelog updates as a vector with a log command, such as
:set-value
aggregation function can emit multiple types of state
transition if necessary.
Fault Tolerance
To allow for full recovery after peer crashes, the window state must be replicated somewhere. As state updates occur, Onyx publishes the stream of changelog updates to a replicated log.
After the changelog entry is written to the replicated log, the segment is acked, ensuring that a segment is only cleared from the input source after the update to window states it caused has been fully written to the log. When a peer crash occurs, a new peer will be assigned to the task, and this peer will play back all of the changelog entries, and apply them to the state, starting with the initial state. As the changelog updates are read back in the same order that they were written, the full state will be recovered. Partial updates ensure that only minimal update data is written for each segment processed, while remaining correct on peer failure.
Exactly Once Aggregation Updates
Exactly once aggregation updates are supported via Onyx’s filtering
feature. When a task’s catalog has :onyx/uniqueness-key
set, this key
is looked up in the segment and used as an ID key to determine whether
the segment has been seen before. If it has previously been processed,
and state updates have been persisted, then the segment is not
re-processed. This key is persisted to the state log transactionally
with the window changelog updates, so that previously seen keys can be
recovered in case of a peer failure.
See the section "Exactly Once Side-Effects" for discussion of why side-effects are impossible to achieve Exactly Once.
Considerations
In order to reduce memory consumption, uniqueness-key values are persisted to a local database, currently implemented with RocksDB. This database uses a bloom filter, and a memory cache, allowing Onyx to avoid hitting disk for most filter key checks.
In order to prevent unbounded increase in the size of the filter’s disk consumption, uniqueness-key values are bucketed based on recency, and the oldest bucket is expired as the newest is filled.
Several configuration parameters are available for the rocksdb based
local filter. The most relevant of these for general configuration is
:onyx.rocksdb.filter/num-ids-per-bucket
, and
:onyx.rocksdb.num-buckets
, which are the size and the number of
buckets referenced above.
Parameter | Description | Default |
---|---|---|
|
RocksDB block cache size in bytes. Larger caches reduce the chance that the peer will need to check for the prescence of a uniqueness key on disk. Defaults to 100MB. |
104857600 |
|
Number of bloom filter bits to use per uniqueness key value |
10 |
|
Number of uniqueness key values that can exist in a RocksDB filter bucket. |
10000000 |
|
Number of rotating filter buckets
to use. Buckets are rotated every
|
10 |
|
RocksDB block size. May worth being tuned depending on the size of your uniqueness-key values. |
4096 |
|
Whether to use compression in
rocksdb filter. It is recommended that |
|
|
Temporary directory to persist uniqueness filtering data. |
/tmp/rocksdb_filter |
Exactly Once Side-Effects
Exactly once side-effects resulting from a segment being processed may occur, as exactly once side-effects are impossible to achieve. Onyx guarantees that a window state updates resulting from a segment are perfomed exactly once, however any side-effects that occur as a result of the segment being processed cannot be guaranteed to only occur once.
BookKeeper Implementation
State update changelog entries are persisted to BookKeeper, a replicated log server. An embedded BookKeeper server is included with Onyx. You can either use the embedded or run BookKeeper along side Onyx in a separate process.
BookKeeper ensures that changelog entries are replicated to multiple nodes, allowing for the recovery of windowing states upon the crash of a windowed task.
By default the the Onyx BookKeeper replication is striped to 3 BookKeeper instances (the quorum), and written to 3 instances (the ensemble).
Running the embedded BookKeeper server
The embedded BookKeeper server can be started via the
onyx.api/start-env
api call, with an env-config where
:onyx.bookkeeper/server?
is true
.
When running on a single node, you may wish to use BookKeeper without
starting the multiple instances of BookKeeper required to meet the
ensemble and quorum requirements. In this case you may start a local
quorum (3) of BookKeeper servers by setting
:onyx.bookkeeper/local-quorum?
to true
.
Embedded BookKeeper Configuration Parameters
Parameter | Description | Default |
---|---|---|
|
Bool to denote whether to startup a BookKeeper instance on this node, for use in persisting Onyx state information. |
|
|
Directory to store BookKeeper’s ledger in. It is recommended that this is altered to somewhere fast, preferably on a different disk to the BookKeeper jou ral |
|
|
Port to startup this node’s BookKeeper instance on. |
|
|
Ports to use for the local BookKeeper quorum. |
|
|
Directory to store BookKeeper’s journal in. It is recommended that this is altered to somewhere fast, preferably on a different disk to the BookKeeper le der. |
|
|
Bool to denote whether to startup a full quorum of BookKeeper instances on this node. Important: for TEST purposes only. |
|
State Log Compaction
It is recommended that the state changelog is periodically compacted. When compaction occurs, the current state is written to a new ledger and all previous ledgers are swapped for the new compacted state ledger.
Compaction can currently only be performed within a task lifecycle for
the windowed task. Be careful to choose the condition (see
YOUR-CONDITION
in the example below, as compacting too often is likely
expensive. Compacting once every X segments is reasonable good choice of
condition.
(def compaction-lifecycle
{:lifecycle/before-batch
(fn [event lifecycle]
(when YOUR-CONDITION
(state-extensions/compact-log (:onyx.core/state-log event) event @(:onyx.core/window-state event)))
{})})
BookKeeper Implementation Configuration
The BookKeeper state log implementation can be configured via the
peer-config. Of particular note, is :onyx.bookkeeper/ledger-password
which generally be changed to a more secure default.
Parameter | Description | Default |
---|---|---|
|
Number of bookkeeper ledger entries to read at a time when recovering state. Effective batch read of state entries is write-batch-size * read-batch-si z. |
50 |
|
Number of milliseconds to back off (sleep) after writing BookKeeper ledger id to the replica. |
50 |
|
Password to use for Onyx state persisted to BookKeeper ledgers. Highly recommended this is changed on cluster wide basis. |
INSECUREDEFAULTPASSWORD |
|
Tunable write throttle for BookKeeper ledgers. |
30000 |
|
Size of the buffer to which BookKeeper ledger writes are buffered via. |
10000 |
|
BookKeeper client timeout. |
60000 |
|
Number of state persistence writes to batch into a single BookKeeper ledger entry. |
20 |
|
The number of BookKeeper instances over which entries will be written to. For example, if you have an ledger-ensemble-size of 3, and a ledger-quorum-s ie of 2, the first write will be written to server1 and server2, the second write will be written to server2, and server3, etc. |
3 |
|
The number of BookKeeper instances over which entries will be striped. For example, if you have an ledger-ensemble-size of 3, and a ledger-quorum-size o 2, the first write will be written to server1 and server2, the second write will be written to server2, and server3, etc. |
3 |
|
Maximum amount of time to wait while batching BookKeeper writes, before writing the batch to BookKeeper. In case of a full batch read, timeout will no tbe hit. |
50 |
Testing Onyx Jobs
In this chapter, we’ll cover what you need to know about testing your Onyx application code.
Overview
Onyx eschews customized abstractions for describing distributed programs. As a consequence, application code written on top of Onyx ends up being regular Clojure functions and data structures. Onyx remains out of the way, and you’re free to test your functions as you would any other Clojure program. It’s also useful to roll up your entire job and test it as it will run in production. This section is dedicated to giving you a set of idioms and tools for working with Onyx jobs during the development phase.
Automatic Resource Clean up
While it’s easy enough to run Onyx with an in-memory ZooKeeper instance
(see the Environment chapter for how to do this), there are a host of
development comforts that are missing when working on your Onyx code.
One key pain point is the clean shutdown of resources on job completion
or failure. Often when developing a new Onyx job, you’ll make lots of
mistakes and your job will be killed. Before the next time you run your
job, it’s a good idea to make sure your peers, peer group, and
development environment all cleanly shut down. This can be a moderately
tricky task at the repl where a Thread may be interrupted by a Control-C
sequence. To this end, Onyx provides a onyx.test-helper
namespace with
a handy macro known as with-test-env
.
with-test-env
takes three parameters: a symbol to bind the development
environment var to, a sequence of [number-of-peers-to-start, peer-config, env-config]
,
and a body of expressions. This macro starts a development environment with the
requested number of peers, runs the body of expressions, and cleanly shuts the
development enivronment down when either the body of expressions completes, or
an exception is thrown (included ThreadInterruptedException). You’ll notice that
this is an anaphoric macro. The macro creates a development environment, then binds
it to the user supplied symbol. Using this macro, you don’t need to worry about
ports remaining open that should have been closed on shutdown. You can safely
interrupt Onyx at any point in time during the job execution.
In practice, this looks something like the following:
(deftest my-onyx-job-test
(let [id (java.util.UUID/randomUUID)
config (load-config)
env-config (assoc (:env-config config) :onyx/tenancy-id id)
peer-config (assoc (:peer-config config) :onyx/tenancy-id id)]
(with-test-env [test-env [3 env-config peer-config]]
(let [catalog ...
workflow ...
lifecycles ...]
(onyx.api/submit-job peer-config
{:catalog catalog
:workflow workflow
:lifecycles lifecycles
:task-scheduler :onyx.task-scheduler/balanced})
(let [results (take-segments! ...)
expected ...]
(is (= expected results)))))))
Code Reloading
Another painful part of writing asynchronous code in general is the
matter of reloading code without restarting your repl. Something that
plays particularly well with Onyx is clojure.tools.namespace. A pattern
that we like to use is to create a user.clj
file for the developer
with the following contents:
(ns user
(:require [clojure.tools.namespace.repl :refer [refresh set-refresh-dirs]]))
(set-refresh-dirs "src" "test")
(defn init [])
(defn start [])
(defn stop [])
(defn go []
(init)
(start))
(defn reset []
(stop)
(refresh))
When you want to reload your code, invoke the (reset)
function. You
can supply any extra application specific code in init
, start
, and
stop
. Combining this pattern with the with-test-env
macro, you
should virtually never need to restart your repl while developing Onyx
code.
In-Memory I/O
Production level Onyx jobs talk to input streams like Kafka, or databases like Postgres. It’s not always helpful to run those pieces of infrastructure while developing your job. Early on, we like to just focus on what the shape of the data will look like and you in-memory I/O with core.async instead of Kafka or what have you. There’s plenty of documentation on how to actually use the core.async plugin. The big question is - how does one most effectively use core.async for development, and more realistic I/O targets for staging and production?
Our approach leverages Onyx’s all-data job specification. We’ve found it helpful to isolate the parts of the catalog and lifecycles that will vary between different environments and use a "builder" pattern. We start with a "base job" - the pieces of the job that are invariants across all environments:
(defn base-job [mode onyx-id task-scheduler]
(let [datomic-uri (my-env-helper/get-datomic-uri mode onyx-id)]
{:workflow wf/parse-event-workflow
:catalog (cat/build-parse-event-catalog datomic-uri)
:flow-conditions (fcp/parser-flow-conditions :parse-log
[:write-parse-failures]
[:write-to-datomic])
:lifecycles sl/base-parse-event-lifecycles
:task-scheduler task-scheduler}))
The function which builds the base job takes a "mode", indicating what
environment we should construct. We like to use keywords - things like
:dev
, :staging
, :prod
. Functions that receive these keywords are
often multimethods which dispatch on mode, building the appropriate
configuration files. In this example, we use the mode parameter to vary
which Datomic URI we’re going to use in our catalog.
Next, we add in environment-specific code using a little utility function to update the base job that’s passed in as a parameter:
(defn add-metrics [job m-config]
(my-fun-utilities/add-to-job job {:lifecycles
(metrics/build-metrics-lifecycles
(:riemann/host m-config)
(:riemann/port m-config))}))
(defn add-logging [job]
(my-fun-utilities/add-to-job job {:lifecycles sl/log-parse-event-lifecycles}))
Finally, we put them all together with a simple cond→
:
(let [mode :dev
log? true
metrics? false]
(cond-> (base-job mode onyx-id task-scheduler)
log? (add-logging)
metrics? (add-metrics m-config)))
It’s important to remember that we’re working with plain Clojure data here. The sky is the limit on how you can put the building blocks together!
Deployment
Onyx has no built-in mechanism for deployment. Rather, we let you deploy at your comfort. We’ll describe some approaches to doing this.
Deployment Style
Unlike Hadoop and Storm, Onyx does not have a built-in deployment feature. To deploy your application, you need to uberjar your program and place it on every node in your cluster. Start up the uberjar, passing it your shared Onyx ID and ZooKeeper address. Once it connects, you’ve successfully deployed!
We’ve chosen not to build in a deployment strategy because there are so many flexible approaches to handling deployment. S3, Docker, Mesos, Swarm, and Kubernetes are a few good choices. We’ll describe some of these strategies below.
Shared File System
Perhaps the most primitive deployment that you can use is a shared file system. Write a small script to SCP your uberjar to each of the nodes in your cluster and start the jar. You might use S3 for this, and a utility that allows you to manipulate a few SSH sessions in parallel. This is great for getting started on 3 nodes or so. You’ll want to use something a bit more elaborate as you go to production and get bigger, though.
Docker
We recommend packaging your uberjar into a Docker container. This is useful for locking in a specific Java version. Make sure you expose the ports that peers will be communicating on (this is configurable, see the peer configuration chapter). Once you have that down, you can upload your Docker image to DockerHub and use a simple script to pull down your Docker image and begin its execution. This is another step in the right direction, but read on to remove the scripting part of this task for real production systems.
Mesos and Marathon
Mesos and Marathon are a pair of applications that work together to manage your entire cluster of machines. I recommend deploying your Docker image onto Marathon, which will allow you to scale at runtime and avoid any scripting. Marathon ensures that if your Docker container goes down, it will be restarted. This is one of our favorite solutions for production scale, cluster-wide management at the moment.
Production Check List
A production check list is included in the Environment documentation.
Monitoring
When setting up an Onyx cluster in production, it’s helpful to know what Onyx itself is doing. Onyx exposes a set of callbacks that are triggered on certain actions.
Monitoring Hooks
When you’re taking Onyx to production, it’s not enough to know what your application-specific code is doing. You need to have insight into how Onyx is operating internally to be able to tune performance at an optimal level. Onyx allows you to register callbacks that are invoked when critical sections of code are executed in Onyx, returning a map that includes latency and data load size if appropriate.
Callback Specification
Callback functions that are given to Onyx for monitoring take exactly two parameters. The first parameter is the monitoring configuration, the second is a map. The return value of the function is ignored. The event names and keys in their corresponding maps are listed in a later section of this chapter, as well as a discussion about monitoring configuration. Here’s an example of a callback function:
(defn do-something [config {:keys [event latency bytes]}]
;; Write some metrics to Riemann or another type of services.
(send-to-riemann event latency bytes))
Registering Callbacks
To register callbacks, create a map of event name key to callback
function. This map must have a key :monitoring
, mapped to keyword
:custom
. If you want to ignore all callbacks, you can instead supply
:no-op
. Monitoring is optional, so you can skip any monitoring code
entirely if you don’t want to use this feature.
A registration example might look like:
(defn std-out [config event-map]
;; `config` is the `monitoring-config` var.
(prn event-map))
(def monitoring-config
{:monitoring :custom
:zookeeper-write-log-entry std-out
:zookeeper-read-log-entry std-out
:zookeeper-write-catalog std-out
:zookeeper-write-workflow std-out
:zookeeper-write-flow-conditions std-out
:zookeeper-force-write-chunk std-out
:zookeeper-read-catalog std-out
:zookeeper-read-lifecycles std-out
:zookeeper-gc-log-entry std-out})
;; Pass the monitoring config as a second parameter to the `start-peer-group` function.
(def peer-group (onyx.api/start-peer-group (assoc peer-config :monitoring-config monitoring-config)))
Monitoring Events
This is the list of all monitoring events that you can register hooks for. The keys listed are present in the map that is passed to the callback function. The names of the events should readily identify what has taken place to trigger the callback.
Event Name | Keys |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Performance Tuning
This chapter details a few tips for getting your Onyx cluster to run as fast as possible.
Onyx
-
Use Clojure 1.7+. 1.7 and 1.8 have some considerable performance enhancements compared to 1.6.
-
SETUP METRICS. We cannot stress this point enough. Please read the guide at onyx-metrics to get started.
-
Set the timbre log level to elide logging calls in production. This should be done at compile time, i.e. when AOT compiling, or when running via leinigen. You can do using the environment variable:
TIMBRE_LOG_LEVEL=info command_to_start_your_peers_or_compile_AOT
. -
Turn off clojure assertions. Onyx uses assertions liberally, to find errors early, however these do have an associated cost. Use
:global-vars {assert false}
in your leiningen profile, or(set! assert false)
in your system bootup namespace. -
Use
JVM_OPTS="-server"
in production. -
Use
JVM_OPTS="-XX:+UseG1GC"
. The G1GC has more predictable performance that can improve latency, though may decrease throughput by a small amount. It is strongly recommended you use this GC, unless you have a reason not to. -
Check that you do not use reflection in any of the code running in your onyx tasks critical paths e.g. in onyx/fn, before/after-batch lifecycles, aggregations, etc. You can check for reflection by using
:global-vars {warn-on-reflection false}
in your lein profile. -
Tweak the
:onyx/batch-size
for the job’s tasks. A batch size of 20-100 segments is a reasonable start that will amortise the cost of onyx’s task lifecycle vs latency. -
For small segments, batch multiple segments into a single segment, and treat each new segment as a rolled up batch.
-
Tweak the batch timeout in each catalog entry to trade off increased latency for higher throughput. Counterintuitively, note that increased batch sizes can actually decrease average case latency, as improved throughput can decrease queuing effects.
-
The peer-config option
:onyx.messaging/allow-short-circuit?
(Peer Config), should be set to true in production. -
Profile your code locally using Java Flight Recorder / Mission Control. The easiest way to get started is to create a benchmark test, and the following JVM_OPTS
-XX:+UnlockCommercialFeatures -XX:+FlightRecorder" -XX:+UnlockDiagnosticVMOptions -XX:StartFlightRecording=duration=1080s,filename=localrecording.jfr
. This will save a file to "localrecording.jfr, which you can open in mission control (which can be started via the commandjmc
. You will need Oracle’s JVM to use this functionality. -
Start roughly 1 virtual peer per core per machine. When using Aeron messaging,
cores = virtual peers + subscribers
is a good guideline. This recommendation is a good starting point, however may not hold true when some virtual peers are largely idle, or spend much of their time I/O blocked. -
Use a custom compression scheme, rather than Nippy. You can configure custom compression/decompression functions via the peer configuration.
-
Increase the number of acker peers through the peer configuration as your cluster gets larger
-
Tune the number of Aeron subscriber threads, if serialization is a large proportion of work performed in tasks.
ZooKeeper
-
Put the ZooKeeper journal on its own physical volume
Zookeeper tends to not get a huge amount of traffic, so this probably won’t offer a huge performance boost. It’s helpful if you’re trying to make your processing latency as predictable as possible, though.
Messaging
Aeron
Ensure you disable the embedded media driver, and instead use an independent media driver (see Media Driver)
When testing performance with a single node using the Aeron messaging layer, connection short circuiting may cause very misleading results.
The peer-config option
:onyx.messaging/allow-short-circuit?
(Peer Config), should be be set to false for realistic
performance testing when only a single node is available for testing. Ensure
this option is set to true when operating in production.
Please refer to the Aeron messaging section for general discussion of the Aeron messaging implementation and its characterstics.
Environment
In this chapter, we’ll discuss what you need to set up a develop and production environment.
Development Environment
Explanation
One of the primary design goals of Onyx is to make the development environment as close as possible to production - without making the developer run a lot of services locally. A development environment in Onyx merely needs Clojure 1.7+ to operate. A ZooKeeper server is spun up in memory via Curator, so you don’t need to install ZooKeeper locally if you don’t want to.
Production Environment
Multi-node & Production Checklist
Congratulations! You’re going to production, or at least testing your Onyx jobs with a multi-node setup.
We strongly recommend you run through this checklist before you do so, as it will likely save you a lot of time.
-
❏ Ensure your JVM is running with JVM opts -server Performance will be greatly decreased if you do not run Onyx via Java without at least
-server
JVM opts. -
❏ Disable embedded ZooKeeper: when
onyx.api/start-env
is called with a config where:zookeeper/server? true
, it will start an embedded ZooKeeper.:zookeeper/server?
should be set tofalse
in production. -
❏ Setup production ZooKeeper: A full ZooKeeper ensemble should be used in lieu of the testing ZooKeeper embedded in Onyx.
-
❏ Increase maximum ZK connections Onyx establishes a large number of ZooKeeper connections, in which case you will see an exception like the following:
WARN org.apache.zookeeper.server.NIOServerCnxn: Too many connections from /127.0.0.1 - max is 10
. Increase the number of connections in zoo.cfg, viamaxClientCnxns
. This should be set to a number moderately larger than the number of virtual peers that you will start. -
❏ Configure ZooKeeper address to point an ensemble:
:zookeeper/address
should be set in your peer-config e.g.:zookeeper/address "server1:2181,server2:2181,server3:2181"
. -
❏ Ensure all nodes are using the same
:onyx/tenancy-id
::onyx/tenancy-id
in the peer-config is used to denote which cluster a virtual peer should join. If all your nodes do not use the same:onyx/tenancy-id
, they will not be a part of the same cluster and will not run the same jobs. Any jobs submitted a cluster must also use the same:onyx/tenancy-id
to ensure that cluster runs the job. -
❏ Do not use core async tasks: switch all input or output tasks from core.async as it is a not a suitable medium for multi-node use and will result in many issues when used in this way. The Kafka plugin is one recommended alternative.
-
❏ Test on a single node with without short circuiting: when
:onyx.messaging/allow-short-circuit?
istrue
, Aeron messaging will be short circuited completely. To test messaging on a local mode as if it’s in production, set:onyx.messaging/allow-short-circuit? false
. -
❏ Ensure short circuiting is enabled in production: short circuiting improves performance by locally messaging where possible. Ensure
:onyx.messaging/allow-short-circuit? true
is set in the peer-config on production. -
❏ Set messaging bind address: the messaging layer must be bound to the network interface used by peers to communicate. To do so, set
:onyx.messaging/bind-addr
in peer-config to a string defining the interface’s IP. On AWS, this IP can easily be obtained via(slurp "http://169.254.169.254/latest/meta-data/local-ipv4")
. -
❏ Is your bind address external facing?: If your bind address is something other than the one accessible to your other peers (e.g. docker, without net=host), then you will need to define an external address to advertise. This can be set via
:onyx.messaging/external-addr
in peer-config. -
❏ Open UDP ports for Aeron: Aeron requires the port defined in
:onyx.messaging/peer-port
to be open for UDP traffic. -
❏ Setup an external Aeron Media Driver: If messaging performance is a factor, it is recommended that the Aeron Media Driver is run out of process. First, disable the embedded driver by setting
:onyx.messaging.aeron/embedded-driver? false
. An example out of process media driver is included in lib-onyx. This media driver can be started vialein run -m
, or via an uberjar, each by referencing the correct namespace, which contains a main entry point. Ensure that the media driver is started with JVM opts-server
. -
❏ Setup metrics: when in production, it is essential that you are able to measure retried messages, input message complete latency, throughput and batch latency. Setup Onyx to use onyx-metrics. We recommend at very least using the timbre logging plugin, which is easy to setup.
ZooKeeper
Environment Launch of In-Memory ZooKeeper
To launch an in-memory ZooKeeper instance, add :zookeeper/server? true
to the environment options. Also, specify
:zookeeper.server/port <my port>
so that Curator knows what port to
start running the server on.
If your deployment throws an exception and doesn’t shut down ZooKeeper, it will remain open. Firing up the environment again will cause a port collision, so be sure to restart your repl in that case.
Example
Here’s an example of using ZooKeeper in-memory, with some non-ZooKeeper required parameters elided.
(def env-config
{:zookeeper/address "127.0.0.1:2182"
:zookeeper/server? true
:zookeeper.server/port 2182
:onyx/tenancy-id id})
(def peer-opts
{:zookeeper/address "127.0.0.1:2182"
:onyx/tenancy-id id})
Networking / Firewall
Messaging requires the UDP port to be open for port set
:onyx.messaging/peer-port
.
All peers require the ability to connect to the ZooKeeper instances over TCP.
Example
Notice that all we’re doing is extending the address string to include more host:port pairs. This uses the standard ZooKeeper connection string, so you can use authentication here too if you need it.
(def peer-opts
{...
:zookeeper/address "10.132.8.150:2181,10.132.8.151:2181,10.132.8.152:2181"
...})
Peer Configuration
The chapter describes the all options available to configure the virtual peers and development environment.
Environment Only
key name | type | optional? |
---|---|---|
|
|
Yes |
|
|
Yes |
|
|
Yes |
|
|
Yes |
|
|
Yes |
|
|
Yes |
|
|
Yes |
|
|
Yes |
Peer Only
Base Configuration
key name | type | default |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
:onyx.peer/inbox-capacity
Maximum number of messages to try to prefetch and store in the inbox, since reading from the log happens asynchronously.
:onyx.peer/outbox-capacity
Maximum number of messages to buffer in the outbox for writing, since writing to the log happens asynchronously.
:onyx.peer/retry-start-interval
Number of ms to wait before trying to reboot a virtual peer after failure.
:onyx.peer/drained-back-off
Number of ms to wait before trying to complete the job if all input tasks have been exhausted. Completing the job may not succeed if the cluster configuration is being shifted around.
:onyx:onyx.peer/peer-not-ready-back-off
Number of ms to back off and wait before retrying the call to start-task?
lifecycle hook if it returns false.
:onyx:onyx.peer/job-not-ready-back-off
Number of ms to back off and wait before trying to discover configuration needed to start the subscription after discovery failure.
:onyx.peer/join-failure-back-off
Number of ms to wait before trying to rejoin the cluster after a previous join attempt has aborted.
:onyx.peer/fn-params
A map of keywords to vectors. Keywords represent task names, vectors represent the first parameters to apply
to the function represented by the task. For example, {:add [42]}
for task :add
will call the function
underlying :add
with (f 42 <segment>)
.
:onyx.peer/tags
Tags which denote the capabilities of this peer in terms of user-defined functionality. Must be specified as a vector of keywords. This is used in combination with :onyx/required-tags
in the catalog to force tasks to run on certain sets of machines.
:onyx.zookeeper/backoff-base-sleep-time-ms
Initial amount of time to wait between ZooKeeper connection retries
:onyx.peer/backpressure-low-water-pct
Percentage of messaging inbound-buffer-size that constitutes a low water mark for backpressure purposes.
:onyx.peer/backpressure-high-water-pct
Percentage of messaging inbound-buffer-size that constitutes a high water mark for backpressure purposes.
:onyx.peer/backpressure-check-interval
Number of ms between checking whether the virtual peer should notify the cluster of backpressure-on/backpressure-off.
:onyx.messaging/inbound-buffer-size
Number of messages to buffer in the core.async channel for received segments.
:onyx.messaging/completion-buffer-size
Number of messages to buffer in the core.async channel for completing messages on an input task.
:onyx.messaging/release-ch-buffer-size
Number of messages to buffer in the core.async channel for released completed messages.
:onyx.messaging/retry-ch-buffer-size
Number of messages to buffer in the core.async channel for retrying timed-out messages.
:onyx.messaging/peer-link-gc-interval
The interval in milliseconds to wait between closing idle peer links.
:onyx.messaging/peer-link-idle-timeout
The maximum amount of time that a peer link can be idle (not looked up in the state atom for usage) before it is eligible to be closed. The connection will be reopened from scratch the next time it is needed.
:onyx.messaging/ack-daemon-timeout
Number of milliseconds that an ack value can go without being updates on a daemon before it is eligible to time out.
:onyx.messaging/ack-daemon-clear-interval
Number of milliseconds to wait for process to periodically clear out ack-vals that have timed out in the daemon.
:onyx.messaging/decompress-fn
The Clojure function to use for messaging decompression. Receives one argument - a byte array. Must return the decompressed value of the byte array.
:onyx.messaging/compress-fn
The Clojure function to use for messaging compression. Receives one argument - a sequence of segments. Must return a byte array representing the segment seq.
:onyx.messaging/bind-addr
An IP address to bind the peer to for messaging. Defaults to nil
, binds to it’s external IP to the result of calling http://checkip.amazonaws.com
.
:onyx.messaging/allow-short-circuit?
A boolean denoting whether to allow virtual peers to short circuit networked messaging when colocated with the other virtual peer. Short circuiting allows for direct transfer of messages to a virtual peer’s internal buffers, which improves performance where possible. This configuration option is primarily for use in perfomance testing, as peers will not generally be able to short circuit messaging after scaling to many nodes.
:onyx.messaging.aeron/embedded-driver?
A boolean denoting whether an Aeron media driver should be started up with the environment. See Aeron Media Driver for an example for how to start the media driver externally.
:onyx.messaging.aeron/subscriber-count
The number of Aeron subscriber threads that receive messages for the peer-group. As peer-groups are generally configured per-node (machine), this setting can bottleneck receive performance if many virtual peers are used per-node, or are receiving and/or de-serializing large volumes of data. A good guidline is is num cores = num virtual peers + num subscribers
, assuming virtual peers are generally being fully utilised.
:onyx.messaging.aeron/write-buffer-size
Size of the write queue for the Aeron publication. Writes to this queue will currently block once full.
:onyx.messaging.aeron/poll-idle-strategy
The Aeron idle strategy to use between when polling for new messages. Currently, two choices :high-restart-latency
and :low-restart-latency
can be chosen. low-restart-latency may result in lower latency message, at the cost of higher CPU usage or potentially reduced throughput.
:onyx.messaging.aeron/offer-idle-strategy
The Aeron idle strategy to use between when offering messages to another peer. Currently, two choices :high-restart-latency
and :low-restart-latency
can be chosen. low-restart-latency may result in lower latency message, at the cost of higher CPU usage or potentially reduced throughput.
Peer Full Example
(def peer-opts
{:onyx/tenancy-id "df146eb8-fd6e-4903-847e-9e748ca08021"
:zookeeper/address "127.0.0.1:2181"
:onyx.peer/inbox-capacity 2000
:onyx.peer/outbox-capacity 2000
:onyx.peer/retry-start-interval 4000
:onyx.peer/join-failure-back-off 500
:onyx.peer/drained-back-off 400
:onyx.peer/peer-not-ready-back-off 5000
:onyx.peer/job-not-ready-back-off 1000
:onyx.peer/fn-params {:add [42]}
:onyx.peer/zookeeper-timeout 10000
:onyx.messaging/completion-buffer-size 2000
:onyx.messaging/release-ch-buffer-size 50000
:onyx.messaging/retry-ch-buffer-size 100000
:onyx.messaging/ack-daemon-timeout 90000
:onyx.messaging/ack-daemon-clear-interval 15000
:onyx.messaging/decompress-fn onyx.compression.nippy/decompress
:onyx.messaging/compress-fn onyx.compression.nippy/compress
:onyx.messaging/impl :aeron
:onyx.messaging/bind-addr "localhost"
:onyx.messaging/peer-port-range [50000 60000]
:onyx.messaging/peer-ports [45000 45002 42008]})
Information Model
The information model is now described in the Cheat Sheet. The information model is also described in a nested map.
Scheduling
Onyx offers fine-grained control of how many peers are allocated to particular jobs and tasks. This section outlines how to use the built-in schedulers.
Allocating Peers to Jobs and Tasks
In a masterless design, there is no single entity that assigns tasks to peers. Instead, peers need to contend for tasks to execute as jobs are submitted to Onyx. Conversely, as peers are added to the cluster, the peers must "shift" to distribute the workload across the cluster. Onyx ships out-of-the-box job and task allocation policies. End users can change the levels of fairness that each job gets with respect to cluster power. And remember, one virtual peer executes at most one task.
Job Schedulers
Each running Onyx instance is configured with exactly one job scheduler.
The purpose of the job scheduler is to coordinate which jobs peers are
allowed to volunteer to execute. There are a few different kinds of
schedulers, listed below. To use each, configure the Peer Options map
with key :onyx.peer/job-scheduler
to the value specified below.
Greedy Job Scheduler
The Greedy job scheduler allocates all peers to each job in the order that it was submitted. For example, suppose you had 100 virtual peers and you submitted two jobs - job A and job B. With a Greedy scheduler, all 100 peers would be allocated to job A. If job A completes, all 100 peers will then execute tasks for job B. This probably isn’t desirable when you’re running streaming workflows, since they theoretically never end.
To use this scheduler, set key :onyx.peer/job-scheduler
to
:onyx.job-scheduler/greedy
in the Peer Options.
Peer Addition
In the event that a peer joins the cluster while the Greedy job scheduler is running, that new peer will be allocated to the current job that is being run greedily.
Peer Removal
In the event that a peer leaves the cluster while the Greedy job scheduler is running, no peers will be shifted off of the job that is greedily running.
Job Addition
If a job is submitted while this scheduler is running, no peers will be allocated to this job. The only exception to this rule is if no jobs are currently running. In this case, all peers will be allocated to this job.
Job Removal
If a job is completed or otherwise canceled, all of the peers executed that task will move to the job that was submitted after this job.
Balanced Robin Job Scheduler
The Balanced job scheduler allocates peers in a rotating fashion to jobs that were submitted. For example, suppose that you had 100 virtual peers (virtual peer 1, virtual peer 2, … virtual peer 100) and you submitted two jobs - job A and job B. With a Balanced scheduler, both jobs will end up with 50 virtual peers allocated to each. This scheduler begins allocating by selecting the first job submitted.
To use this scheduler, set key :onyx.peer/job-scheduler
to
:onyx.job-scheduler/balanced
in the Peer Options.
Peer Addition
In the event that a peer joins the cluster while the Balanced scheduler is running, that new peer will be allocated to the job that most evenly balances the cluster according to a the number of jobs divided by the number of peers. If there is a tie, the new peer is added to the earliest submitted job.
Peer Removal
In the event that a peer leaves the cluster while the Balanced scheduler is running, the peers across all jobs will be rebalanced to evenly distribute the workflow.
Job Addition
If a job is submitted while this scheduler is running, the entire cluster will be rebalanced. For example, if job A has all 100 peers executing its task, and job B is submitted, 50 peers will move from job A to job B.
Job Removal
If a job is completed or otherwise canceled while this scheduler is running, the entire cluster will be rebalanced. For example, if job A, B, and C had 20 peers executing each of its tasks (60 peers total), and job C finished, job A would gain 10 peers, and job B would gain 10 peers.
Percentage Job Scheduler
The Percentage job scheduler allows jobs to be submitted with a percentage value. The percentage value indicates what percentage of the cluster will be allocated to this job. The use case for this scheduler is for when you have a static number of jobs and a varying number of peers. For example, if you have 2 jobs - A and B, you’d give each of this percentage values - say 70% and 30%, respectively. If you had 100 virtual peers running, 70 would be allocated to A, and 30 to B. If you then added 100 more peers to the cluster, job A would be allocated 140 peers, and job B 60. This dynamically scaling is a big step forward over statically configuring slots, which is normal in ecosystems like Hadoop and Storm.
If there aren’t enough peers to satisfy the percentage values of all the jobs, this scheduler will allocate with priority to jobs with the highest percentage value. When percentage values are equal, the earliest submitted job will get priority. In the event that jobs are submitted, and the total percentage value exceeds 100%, the earliest submitted jobs that do not exceed 100% will receive peers. Jobs that go beyond it will not receive peers. For example, if you submitted jobs A, B, and C with 70%, 30%, and 20% respectively, jobs A and B would receive peers, and C will not be allocated any peers until either A or B completes.
If the total percentages of all submitted jobs doesn’t sum up to 100%, the job with the highest percentage value will receive the extra peers. When percentage values are equal, the earliest submitted job will get priority.
If the algorithm determines that any job should receive a number of peers that is less than the minimum number it needs to execute, that job receives no peers.
To use this scheduler, set key :onyx.peer/job-scheduler
to
:onyx.job-scheduler/percentage
in the Peer Options.
Peer Addition
In the event that a peer joins the cluster while the Percentage job scheduler is running, the entire cluster will rebalance.
Peer Removal
In the event that a peer leaves the cluster while the Percentage job scheduler is running, the entire cluster will rebalance.
Job Addition
If a job is submitted while this scheduler is running, the entire cluster will be rebalanced.
Job Removal
If a job is completed or otherwise canceled while this scheduler is running, the entire cluster will be rebalanced.
Task Schedulers
Each Onyx job is configured with exactly one task scheduler. The task
scheduler is specified at the time of calling submit-job
. The purpose
of the task scheduler is to control the order in which available peers
are allocated to which tasks. There are a few different Task Scheduler
implementations, listed below. To use each, call onyx.api/submit-job
.
The second argument of this function is a map. Supply a
:task-scheduler
key and map it to the value specified below.
Balanced Task Scheduler
The Balanced Scheduler takes a topological sort of the workflow for a specific job. As peers become available, this scheduler assigns tasks to peers in a rotating order. For example, if a workflow has a topological sort of tasks A, B, C, and D, this scheduler assigns each peer to tasks A, B, C, D, A, B, C, D, … and so on.
To use, set :task-scheduler
in submit-job
to
:onyx.task-scheduler/balanced
.
Peer Removal
If a peer fails, or is otherwise removed from the cluster, the Task scheduler defers to the Job scheduler to rebalance the cluster. If a new peer is added to this task as a result of a peer failing in another job, it is assigned the next task in the balanced sequence.
Max Peer Parameter
With the Balanced Task Scheduler, each entry in the catalog can specify
a key of :onyx/max-peers
with an integer value > 0. When this key is
set, Onyx will never assign that task more than that number of peers.
Balanced will simply skip the task for allocation when more peers are
available, and continue assigning balanced to other tasks.
Percentage Task Scheduler
The Percentage Scheduler takes a set of tasks, all of which must be
assigned a percentage value (:onyx/percentage
) in the corresponding
catalog entries. The percentage values must add up to 100 or less.
Percent values may be integers between 1 and 99, inclusive. This
schedule will allocate peers for this job in proportions to the
specified tasks. As more or less peers join the cluster, allocations
will automatically scale. For example, if a job has tasks A, B, and C
with 70%, 20%, and 30% specified as their percentages, and there are 10
peers, task A receives 7 peers, B 2 peers, and C 1 peer.
This scheduler handles corner cases (fractions of peers) in the same way as the Percentage Job Scheduler. See that documentation for a full description.
To use, set :task-scheduler
in submit-job
to
:onyx.task-scheduler/percentage
.
Peer Removal
If a peer fails, or is otherwise removed from the cluster, the Task scheduler rebalances all the peers for even distribution.
Colocation Task Scheduler
The Colocation Schedule takes all of the tasks for a job and, if possible, assigns them to the peers on a single physical machine represented by the same peer group. If a job has 4 tasks and the cluster is one machine with 5 peers, 4 peers will become active. If that machine had 8 peers, all 8 would become active as this schedule operates in peer chunks that are divisible by the task size. If more machines are capable of executing the entire job, they will also be used.
This scheduler is useful for dramatically increasing performance of jobs where the latency is bound by the network of transmitting data across tasks. Using this scheduler with peer short circuiting will ensure that segments are never serialized and never cross the network between tasks (with the exception of grouping tasks). Onyx’s usual fault tolerancy mechanisms are still used to ensure that data is processed in the presence of machine failure.
To use, set :task-scheduler
in submit-job
to
:onyx.task-scheduler/colcocated
.
To use colocation, but to disable the scheduler’s affinity to always
send segments to a local peer, set
:onyx.task-scheduler.colocated/only-send-local?
to false
in the peer
config. This is desirable when optimal performance depends on the
uniformity of tasks being evenly assigned to machines in your cluster,
but strictly local execution is not helpful for performance.
Peer Addition
If a peer is added to the cluster and its machine is capable of executing all the tasks for this job, the entire machine will be used - provided that it falls into the pool of peers elligible to execute this job, per the job scheduler’s perogative.
Peer Removal
If a peer is removed, all the peers associated with this job’s tasks for this chunk of peers will stop executing their tasks.
Tags
It’s often the case that a set of machines in your cluster are privileged in some way. Perhaps they are running special hardware, or they live in a specific data center, or they have a license to use a proprietary database. Sometimes, you’ll have Onyx jobs that require tasks to run on a predetermined set of machines. Tags are a feature that let peers denote "capabilities". Tasks may declare which tags peers must have in order to be selected to execute them.
Peers
To declare a peer as having special capabilities, use a vector of
keywords in the Peer Configuration under key :onyx.peer/tags
. For
example, if you wanted to declare that a peer has a license for its JVM
to communicate with Datomic, you might add this to your Peer
Configuation:
{...
:onyx/tenancy-id "my-cluster"
:onyx.peer/tags [:datomic]
...
}
You can specify multiple tags. The default is no tags ([]
), in which
case this peer can execute any tasks that do not require tags.
Tasks
Now that we have a means for expressing which peers can do which kinds
of things, we’ll need a way to express which tasks require which
capabilities. We do this in the catalog. Any task can use the key
:onyx/required-tags
with a vector of keywords as a value. Any peer
that executes this task is garunteed to have :onyx/required-tags
as a
subset of its :onyx.peer/tags
.
For example, to declare that task :read-datoms
must be executed by a
peer that can talk to Datomic, you might write:
[{:onyx/name :read-datoms
:onyx/plugin :onyx.plugin.datomic/read-datoms
:onyx/type :input
:onyx/medium :datomic
:onyx/required-tags [:datomic] ;; <- Add this!
:datomic/uri db-uri
...
:onyx/batch-size batch-size
:onyx/doc "Reads a sequence of datoms from the d/datoms API"}
...
]
Event Subscription
Onyx’s log-based design provides open-ended access to react to all coordination events. This section describes how to tap into these notifications.
Explanation
Onyx uses an internal log to totally order all coordination events across nodes in the cluster. This log is maintained as a directory of sequentially ordered znodes in ZooKeeper. It’s often of interest to watch the events as they are written to the log. For instance, you may want to know when a particular task is completed, or when a peer joins or leaves the cluster. You can use the log subscriber to do just that.
Subscribing to the Log
The following is a complete example to pretty print all events as they are written to the log. You need to provide the ZooKeeper address, Onyx ID, and shared job scheduler in the peer config. The subscriber will automatically track recover from sequentially reading errors in the case that a garbage collection is triggered, deleting log entries in its path.
(def peer-config
{:zookeeper/address "127.0.0.1:2181"
:onyx/tenancy-id onyx-id
:onyx.peer/job-scheduler :onyx.job-scheduler/round-robin})
(def ch (chan 100))
(def subscription (onyx.api/subscribe-to-log peer-config ch))
(def log (:log (:env subscription)))
;; Loops forever
(loop [replica (:replica subscription)]
(let [entry (<!! ch)
new-replica (onyx.extensions/apply-log-entry entry replica)]
(clojure.pprint/pprint new-replica)
(recur new-replica)))
(onyx.api/shutdown-env (:env subscription))
Some example output from a test, printing the log position, log entry content, and the replica as-of that log entry:
====
Log Entry #0
Entry is {:message-id 0, :fn :prepare-join-cluster, :args {:joiner #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"}}
Replica is:
{:peer-state {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
:peers [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"],
:job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #1
Entry is {:message-id 1, :fn :prepare-join-cluster, :args {:joiner #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"}}
Replica is:
{:prepared
{#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"},
:peer-state {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
:peers [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"],
:job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #2
Entry is {:message-id 2, :fn :notify-join-cluster, :args {:observer #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf", :subject #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"}}
Replica is:
{:accepted
{#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"},
:prepared {},
:peer-state {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
:peers [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"],
:job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #3
Entry is {:message-id 3, :fn :accept-join-cluster, :args {:observer #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf", :subject #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577", :accepted-observer #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577", :accepted-joiner #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"}}
Replica is:
{:pairs
{#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"},
:accepted {},
:prepared {},
:peer-state
{#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
:peers
[#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"],
:job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #4
Entry is {:message-id 4, :fn :prepare-join-cluster, :args {:joiner #uuid "010a1688-47ff-4055-8da5-1f02247351e1"}}
Replica is:
{:pairs
{#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"},
:accepted {},
:prepared
{#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
:peer-state
{#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
:peers
[#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"],
:job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #5
Entry is {:message-id 5, :fn :notify-join-cluster, :args {:observer #uuid "010a1688-47ff-4055-8da5-1f02247351e1", :subject #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"}}
Replica is:
{:pairs
{#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"},
:accepted
{#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
:prepared {},
:peer-state
{#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
:peers
[#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"],
:job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #6
Entry is {:message-id 6, :fn :accept-join-cluster, :args {:observer #uuid "010a1688-47ff-4055-8da5-1f02247351e1", :subject #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf", :accepted-observer #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577", :accepted-joiner #uuid "010a1688-47ff-4055-8da5-1f02247351e1"}}
Replica is:
{:pairs
{#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
:accepted {},
:prepared {},
:peer-state
{#uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
:peers
[#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"],
:job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #7
Entry is {:message-id 7, :fn :prepare-join-cluster, :args {:joiner #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"}}
Replica is:
{:pairs
{#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
:accepted {},
:prepared
{#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"},
:peer-state
{#uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
:peers
[#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"],
:job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #8
Entry is {:message-id 8, :fn :notify-join-cluster, :args {:observer #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c", :subject #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"}}
Replica is:
{:pairs
{#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
:accepted
{#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"},
:prepared {},
:peer-state
{#uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
:peers
[#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"],
:job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #9
Entry is {:message-id 9, :fn :accept-join-cluster, :args {:observer #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c", :subject #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf", :accepted-observer #uuid "010a1688-47ff-4055-8da5-1f02247351e1", :accepted-joiner #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"}}
Replica is:
{:pairs
{#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c",
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
:accepted {},
:prepared {},
:peer-state
{#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c" :idle,
#uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
:peers
[#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"],
:job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #10
Entry is {:message-id 10, :fn :prepare-join-cluster, :args {:joiner #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"}}
Replica is:
{:pairs
{#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c",
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
:accepted {},
:prepared
{#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"},
:peer-state
{#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c" :idle,
#uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
:peers
[#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"],
:job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #11
Entry is {:message-id 11, :fn :notify-join-cluster, :args {:observer #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4", :subject #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"}}
Replica is:
{:pairs
{#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c",
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
:accepted
{#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"},
:prepared {},
:peer-state
{#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c" :idle,
#uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
:peers
[#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"],
:job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #12
Entry is {:message-id 12, :fn :accept-join-cluster, :args {:observer #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4", :subject #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c", :accepted-observer #uuid "010a1688-47ff-4055-8da5-1f02247351e1", :accepted-joiner #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"}}
Replica is:
{:pairs
{#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"
#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c",
#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4",
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
:accepted {},
:prepared {},
:peer-state
{#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4" :idle,
#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c" :idle,
#uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
:peers
[#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"
],
:job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #13
Entry is {:message-id 13, :fn :submit-job, :args {:id #uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7", :tasks [#uuid "ce13205e-937b-4af6-9aa9-d5149b31fb2c" #uuid "948f8595-3a0a-4318-b128-91c1d22c0158" #uuid "fb86b977-d668-4c98-abaa-80ee0d29663a"], :task-scheduler :onyx.task-scheduler/round-robin, :saturation Infinity, :task-saturation {#uuid "ce13205e-937b-4af6-9aa9-d5149b31fb2c" Infinity, #uuid "948f8595-3a0a-4318-b128-91c1d22c0158" Infinity, #uuid "fb86b977-d668-4c98-abaa-80ee0d29663a" Infinity}}}
Replica is:
{:job-scheduler :onyx.job-scheduler/greedy,
:saturation {#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7" Infinity},
:peers
[#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"],
:accepted {},
:jobs [#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7"],
:tasks
{#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7"
[#uuid "ce13205e-937b-4af6-9aa9-d5149b31fb2c"
#uuid "948f8595-3a0a-4318-b128-91c1d22c0158"
#uuid "fb86b977-d668-4c98-abaa-80ee0d29663a"]},
:pairs
{#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"
#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c",
#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4",
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
#uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
:allocations {#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7" {}},
:prepared {},
:peer-state
{#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4" :idle,
#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c" :idle,
#uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
:task-schedulers
{#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7"
:onyx.task-scheduler/round-robin},
:task-saturation
{#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7"
{#uuid "ce13205e-937b-4af6-9aa9-d5149b31fb2c" Infinity,
#uuid "948f8595-3a0a-4318-b128-91c1d22c0158" Infinity,
#uuid "fb86b977-d668-4c98-abaa-80ee0d29663a" Infinity}}}
====
...
Plugins
Plugins serve as an abstract to compose mechanisms for getting data in and out of Onyx. See the README.md of the project for a list of official Onyx plugins, or keep reading to roll your own.
Interfaces
In order to implement a plugin, one or more protocols need to be implemented from the Pipeline Extensions API. Reader plugins will implement PipelineInput and Pipeline. Writer plugins will implement Pipeline. See the docstrings for instructions on implementation.
Templates
To help move past the boilerplate of creating new plugins, use Leiningen
with onyx-plugin
to
generate a template.
Coordination within Plugins
Often virtual peers allocated to a task may need to coordinate with respect to allocating work. For example, a Kafka reader task may need to assign partitions to different peers on the same topic. The Onyx mechanism for coordinating peers is the log. The Onyx log is extensible by plugins, by implementing several extensions defmethods.
For example:
(ns your.plugin.log-commands
(:require [onyx.extensions :as extensions]))
(defmethod extensions/apply-log-entry :yourplugin/coordination-type
[{:keys [args]} replica]
replica)
(defmethod extensions/replica-diff :yourplugin/coordination-type
[{:keys [args]} old new]
{})
(defmethod extensions/reactions :yourplugin/coordination-type
[{:keys [args]} old new diff peer-args]
[])
(defmethod extensions/fire-side-effects! :yourplugin/coordination-type
[{:keys [args]} old new diff {:keys [monitoring] :as state}]
state)
When modifying the replica, please assoc-in into replica under [:task-metadata job-id task-id], so that it will be cleaned up when the job is completed or killed.
onyx-core-async
Onyx plugin providing read and write facilities for Clojure core.async.
Installation
This plugin is included with Onyx. You do not need to add it as a separate dependency.
In your peer boot-up namespace:
(:require [onyx.plugin.core-async])
Functions
read-from-chan
Catalog entry:
{:onyx/name :in
:onyx/plugin :onyx.plugin.core-async/input
:onyx/type :input
:onyx/medium :core.async
:onyx/batch-size batch-size
:onyx/max-peers 1
:onyx/doc "Reads segments from a core.async channel"}
Lifecycle entries:
[{:lifecycle/task :your-task-name
:lifecycle/calls :my.ns/in-calls}
{:lifecycle/task :your-task-name
:lifecycle/calls :onyx.plugin.core-async/reader-calls}]
There’s a little extra baggage with core.async because you need a
reference to the channel. Make sure that my.ns/in-calls
is a map that
references a function to inject the channel in:
(def in-chan (chan capacity))
(defn inject-in-ch [event lifecycle]
{:core.async/chan in-chan})
(def in-calls
{:lifecycle/before-task-start inject-in-ch})
write-to-chan
Catalog entry:
{:onyx/name :out
:onyx/plugin :onyx.plugin.core-async/output
:onyx/type :output
:onyx/medium :core.async
:onyx/batch-size batch-size
:onyx/max-peers 1
:onyx/doc "Writes segments to a core.async channel"}
Lifecycle entries:
[{:lifecycle/task :your-task-name
:lifecycle/calls :my.ns/out-calls}
{:lifecycle/task :your-task-name
:lifecycle/calls :onyx.plugin.core-async/writer-calls}]
Again, as with read-from-chan
, there’s a little extra to do since
core.async has some exceptional behavior compared to other plugins:
(def out-chan (chan capacity))
(defn inject-out-ch [event lifecycle]
{:core.async/chan out-chan})
(def out-calls
{:lifecycle/before-task-start inject-out-ch})
Logging
This chapter details how to inspect and modify the logs that Onyx produces.
Timbre
Onyx uses Timbre for logging.
By default, all Onyx output is logged to a file called onyx.log
in the
same directory as where the peer or coordinator jar is executing. The
logging configuration can be overridden completely, see below.
Onyx’s default logging configuration writes all WARN
and FATAL
messages to standard out, also.
Overriding the log file name and path
In order to override the log file location add :onyx.log/file
to both
the environment config sent into start-env
as well as the peer config
sent into start-peer-group
.
Both relative and absolute paths are supported by Timbre.
Overriding the Timbre log config
Similarly, to override the full Timbre log config map, construct the
Timbre configuration and add it to both the environment and peer config
maps under the :onyx.log/config
key. Note that the onyx.log/config
map will be merged with the existing Timbre configuration rather than
replacing it completely. In practice this means that extra configuration
must be sent in to, for example, disable appenders that are enabled by
default. See the examples below.
Examples
The following example simply changes the output file.
(let [log-config {:onyx.log/file "/var/log/onyx.log"}
peer-config (merge my-peer-config log-config)
env-config (merge my-env-config log-config)]
(onyx.api/start-env env-config)
(onyx.api/start-peer-group peer-config)
;; ...
)
This example uses Timbre to redirect Onyx logs into the regular Java logging system using the log-config library.
(require '[com.palletops.log-config.timbre.tools-logging :as tl])
(let [log-config {:onyx.log/config {:appenders
{:spit {:enabled? false}
:standard-out {:enabled? false}
:rotor {:enabled? false}
:jl (tl/make-tools-logging-appender
{:enabled? true
:fmt-output-opts {:nofonts? true}})}
:min-level :trace}}
peer-config (merge my-peer-config log-config)
env-config (merge my-env-config log-config)]
(onyx/start-env env-config)
(onyx/start-peer-group peer-config)
;; ...
)
If you already have timbre logging configured somewhere in your code
base, you can specify :onyx.log/config {}
to ensure your settings
remain unchanged. In effect, this simply merges in the empty map into
whatever settings you may have already specified for logging.
(let [log-config {:onyx.log/config {}}
peer-config (merge my-peer-config log-config)
env-config (merge my-env-config log-config)]
(onyx.api/start-env env-config)
(onyx.api/start-peer-group peer-config)
;; ...
)
See the Timbre
example
configuration for more information on valid values for the
:onyx.log/config
map.
Examples
Examples Project
See the Examples Project for a set of self-contained demonstrations of specific Onyx functionality.
Frequently Asked Questions
My job’s tasks never start
When your job’s tasks don’t start, you won’t see any messages in
onyx.log
on the peer that read something like:
Job fb2f2b80-2c5a-41b6-93c8-df35bffe6915 {:job-name :click-stream} - Task da5795a6-bd4c-41a1-843d-dda84cf5c615 :inc - Peer 2b35433a-b935-43b7-881b-4f4ec16672cc - Warming up task lifecycle {:id =uuid "da5795a6-bd4c-41a1-843d-dda84cf5c615", :name :inc, :egress-ids {:out =uuid "2c210c3e-6411-45fb-85a7-6cc733118215"}}
Instead, the log would show peers starting, but not doing anything. This looks like something:
Starting Virtual Peer 111dcc44-7f53-41a4-8548-047803e8d441
Resolutions
Do you have enough virtual peers provisioned?
When a job is submitted to Onyx, it will only begin when there are
enough virtual peers to sustain its execution. How many are enough?
That depends. One virtual peer may work on at most one task at any
given time. By default, each task in your workflow requires one
available virtual peer to run it. If a grouping task is used, or if you
set :onyx/min-peers
on the task’s catalog entry, the number may be
greater than 1. You can statically determine how many peers a job needs
for execution with
this
simple helper function available in Onyx core.
As an example, if you submit a job with 10 tasks in its workflow, you
need at least 10 available virtual peers for your job to begin
running. If you only have 8, nothing will happen. If you have more than
10 (assuming that 10 is truly the minimum that you need), Onyx may
attempt to dedicate more resources to your job beyond 10 peers,
depending on which scheduler you used You can control the number of
virtual peers that you’re running by modifying the parameters to
onyx.api/start-peers
.
We don’t emit a warning that your cluster is underprovisioned in terms of number of peers because its masterless design makes this difficult to asynchronously detect. This has been discussed in an issue. Onyx is designed to handle under and over provisioning as part of its normal flow, so this is not considered an error condition.
See the Scheduling chapter of the User Guide for a full explanation.
Are you using the Greedy job scheduler?
You’ve hit this case if you have more than one job running concurrently.
When you start your peers, you specified a job scheduler through the
configuration key :onyx.peer/job-scheduler
. If you specified
:onyx.job-scheduler/greedy
, you’ve asked that Onyx dedicate all
cluster resources to the first submitted job until that job is
completed. Then the job is completed, all resources will be assigned to
the next job available, and so on. If you want more than one job running
at a time, consider using the Balanced (onyx.job-scheduler/balanced
)
or Percentage (onyx.job-scheduler/percentage
) job schedulers. See the
Scheduling chapter of this User Guide for a full explanation.
Peers repeatedly echo that they’re warming up
If you see something like the following repeatedly being written to
onyx.log
, you’re in the right place:
Job fb2f2b80-2c5a-41b6-93c8-df35bffe6915 {:job-name :click-stream} - Task 2c210c3e-6411-45fb-85a7-6cc733118215 :out - Peer 3b2c6667-8f41-47a9-ba6b-f97c81ade828 - Peer chose not to start the task yet. Backing off and retrying...
What’s happening here is that your job’s tasks have started execution. Peers have been assigned tasks, and are beginning to get things in order to perform work on them. Before a peer can truly begin running a task, it must do three things:
-
Call all lifecycle hooks registered under
lifecycle/start-task?
sequentially, accruing their results -
Call the constructor for the input or output plugin, if this is an input or output task.
-
Emit a signal back to ZooKeeper indicating that this peer has opened up all of its sockets to receiving incoming communication from other peers.
The results from the first step are a sequence of boolean values
designating whether the peer is allowed to start this task. If any of
the start-task?
lifecycle calls return false, the peer sleeps for a
short period to back off, then performs step 1 again. This process
continues until all lifecycle calls return true. Read the Lifecycles
chapter of this User Guide to learn why it’s useful to be repeatedly try
and back-off from task launching.
The second step will only occur after step 1 successfully completes - that is, all lifecycle functions return true. The reason you see "Backing off and retrying…" in the logs is because step 1 is being repeated again.
Resolutions
Do all your :lifecycle/start-task?
functions eventually return
true
?
If any of your lifecycle hooks for :lifecycle/start-task?
return
false
or nil
, you’ll want to change them to eventually return true
based upon some condition.
Are the connections that your plugin opens valid?
As per step 2, a plugin’s constructor will be called to open any relevant stateful connections, such as a database or socket connection. Many connection calls, such as a JDBC SQL, will block for prolonged periods of time before failing, unless otherwise configured. If the task that appears to be stuck is an input or output task, this is likely the cause. You may want to reconfigure your initial connections to fail faster to make it more obvious as to what’s happening.
None of my messages are being processed
If you’re onyx.log
shows messages that read as follows, your job’s
tasks have successfully started:
Job fb2f2b80-2c5a-41b6-93c8-df35bffe6915 {:job-name :click-stream} - Task da5795a6-bd4c-41a1-843d-dda84cf5c615 :inc - Peer 2b35433a-b935-43b7-881b-4f4ec16672cc - Enough peers are active, starting the task
If you suspect that messages are not being processed, it heavily depends on the input plugin that you’re using.
Resolutions
Try using :onyx/fn
to log all incoming messages
One thing that you can do for extra visibility is to log all incoming
messages from input tasks. This is inadvisable for production, but can
be useful for temporary debugging. You can specify an :onyx/fn
transformation function to any task, including inputs and outputs. It
can be useful to specify a debug function on your input tasks to see
which messages are entering the system. Be sure that this function
returns the original segment! For example, you can define a function:
(defn spy [segment]
(println "Read from input:" segment)
segment)
Then add :onyx/fn ::spy
to your input catalog entries.
Has your job been killed?
Unless otherwise configured by your Lifecycles, if any user-level code
throws an exception, the job will be killed and is no longer elligible
for execution. Check onyx.log
on all your peers to ensure that no
exceptions were thrown. If this were the case, you’d see messages lower
in the log reading:
Job fb2f2b80-2c5a-41b6-93c8-df35bffe6915 {:job-name :click-stream} - Task 1e83e005-3e2d-4307-907b-a3d66e3aa293 :in - Peer 111dcc44-7f53-41a4-8548-047803e8d441 - Stopping task lifecycle
Are you using the Kafka input plugin?
If you’re using the Kafka input plugin, make sure that you’re reading
from a reasonable starting offset of the topic. If you’ve set
:kafka/force-reset?
to true
in the catalog entry, and you’ve also
set :kafka/offset-reset
to :largest
, you’ve instructed Onyx to begin
reading messages from the end of the topic. Until you place more
messages into the topic, Onyx will sit idle waiting for more input. The
starting offset for each input task using Kafka is echoed out in
onyx.log
.
The same messages are being replayed multiple times
Message replay happens when a mesage enters Onyx from an input source, gets processed, and is seen again at a later point in time. Onyx replays messages for fault tolerance when it suspects that failure of some sort has occurred. You can read about how message replay is implemented, and why it is exists, in the Architecture chapter of this User Guide.
There are many reasons why a message may need to be replayed (every possible failure scenario), so we will limit our discussion to controlling replay frequency. See the performance tuning sections of this document for more context about what value is appropriate to set for the replay frequency.
Resolutions
Is your :onyx/pending-timeout
too low?
Messages are replayed from the input source if they do not complete
their route through the cluster within a particular period of time. This
period is controlled by the :onyx/pending-timeout
parameter to the
catalog entry, and it’s default is 60 seconds. You can read about its
specifics
in
the Cheatsheet. You should set this value high enough such that any
segment taking longer than this value to complete is highly likely to
have encountered a failure scenario.
My program starts running, but then it stalls
Programs that begin healthy by processing messages and then stall are out typically indicative of user-level code problems. We outline a few common cases here.
Resolutions
Does onyx.log have any exceptions in it?
Most exceptions will kill the job in question. If you are simply
monitoring progress by reading from an output data source through Onyx,
you should check all of the peer onyx.log
files for exceptions that
may have killed the job.
Are any user-level functions blocking?
Any implementations of :onyx/fn
that are blocking will halt progress
of all other segments that are directly lined up behind it. Ensure that
user level functions finish up in a timely manner.
Are messages being replayed?
To get started, see the full section on how and why messages are being replayed. In short, messages will be replayed in 60 seconds if they are not completed. You may be experiencing initial success, followed by a runtime error that is causing temporarily lost segments before replay.
Are you using a core.async output plugin?
If you’re using a core.async output plugin writing to a channel that will block writes when the buffer is full, you have run enough messages to put onto the channel such that core.async writes are now blocking, and hence stalling Onyx.
Are your peer hosts and ports advertised correctly?
Ensure that the host and port that the peer advertises to the rest of the cluster for incoming connections is correct. If it is incorrect, only tasks that are colocated on the same machine will have a chance of working. Remember that Onyx uses UDP as its port, so make sure that your security settings are allowing traffic to run through that protocol.
The host is configured via the :onyx.messaging/bind-addr
key, and the
port is configured via the :onyx.messaging/peer-port
key.
Peer fails to start, and throws
java.io.IOException: No space left on device
This exception commonly occurs when running Onyx inside of a Docker
container. Aeron requires more shared memory than the container
allocates by default. You can solve this problem by starting your
container with a larger amount of shared memory by specifying
--shm-size
on Docker >= 1.10.
Aeron Mediadriver crashes the JVM with SIGBUS
This exception can occur when Aeron does not have enough shared memory. Increase the amount of shared memory that is set as described above.
Peer fails to start, and throws
org.apache.bookkeeper.bookie.BookieException$InvalidCookieException: Cookie
This exception occurs due to a bug in BookKeeper reconnection to
ZooKeeper before it’s ephemeral node expires. We are currently surveying
our own workarounds until this is patched, but for now the thing to do
is to delete /tmp/bookkeeper_journal
and /tmp/bookkeeper_ledger
on
the host. Restart the peer, and all will be well.
Peer fails to start, and throws
java.lang.IllegalStateException: aeron cnc file version not understood
This exception occurs when Aeron’s version is upgraded or downgraded between incompatible versions. The exception will also provide a path on the OS to some Aeron files. Shutdown the peer, delete that directory, then restart the peer.
Peer fails to start, and throws
Failed to connect to the Media Driver - is it currently running?
This message is thrown when the peer tries to start, but can’t engage
Aeron in its local environment. Aeron can be run in embedded mode by
switching :onyx.messaging.aeron/embedded-driver?
to true
, or by
running it out of process on the peer machine, which is the recommended
production setting. If you’re running it out of process, ensure that it
didn’t go down when you encounter this message. You should run Aeron
through a process monitoring tool such as monit
when running it out of
process.
Peer fails to start, and throws
uk.co.real_logic.aeron.driver.exceptions.ActiveDriverException: active driver detected
You have encountered the following exception:
uk.co.real_logic.aeron.driver.exceptions.ActiveDriverException: active driver detected clojure.lang.ExceptionInfo: Error in component :messaging-group in system onyx.system.OnyxPeerGroup calling ='com.stuartsierra.component/start
This is because you have started your peer-group twice without shutting
it down. Alternatively, you may be using
:onyx.messaging.aeron/embedded-driver? true
in your peer-group and
starting a media driver externally. Only one media driver can be started
at a time.
Application fails to build uberjar, throw
'java.lang.unsupporteclassversionerror: uk.co.real_logic/aeron/Aeron$context unsupported major.minor version 52.0'
You have encountered the following exception:
java.lang.unsupporteclassversionerror: uk.co.real_logic/aeron/Aeron$context unsupported major.minor version 52.0
This is because you are trying to build/run an Onyx app with a JRE version lower than 1.8. Onyx supports Java 1.8 only.
Peer fails to start, and throws
org.apache.bookkeeper.proto.WriteEntryProcessorV3: Error writing entry:X to ledger:Y
You have encountered the following exception:
2015-12-16 16:59:35 ERROR org.apache.bookkeeper.proto.WriteEntryProcessorV3: Error writing entry:0 to ledger:2 org.apache.bookkeeper.bookie.Bookie$NoLedgerException: Ledger 2 not found
Your ZooKeeper directory has been cleared out of information that points to the BookKeeper servers, and the two processes can’t sync up. This can be fixed by removing the data directory from the BookKeeper servers and ZooKeeper servers.
My program begins running, but throws
No implementation of method: :read-char of protocol: ='clojure.tools.reader.reader-types/Reader found for class
You’ll encounter this exception when your :onyx/fn
returns something
that is not EDN and Nippy serializable, which is required to send it
over the network. Ensure that return values from :onyx/fn
return
either a map, or a vector of maps. All values within must be EDN
serializable.
What does Onyx use internally for compression by default?
Unless otherwise overridden in the Peer Pipeline API, Onyx will use
Nippy. This can be override by
setting the peer configuration with :onyx.messaging/compress-fn
and
:onyx.messaging/decompress-fn
. See the Information Model documentation
for more information.
How can I filter segments from being output from my tasks?
Use [Flow Conditions](\{\{ "/flow-conditions.html" | prepend: page.dir |
prepend: site.baseurl }}) or return an empty vector from your
:onyx/fn
.
Can I return more than one segment from a function?
Return a vector of maps from :onyx/fn
instead of a map. All maps at
the top level of the vector will be unrolled and pushed downstream.
Should I be worried about user-level KeeperException
in ZooKeeper
logs?
You should monitor these, however KeeperErrorCode = NodeExists
are
probably fine:
2015-11-05 15:12:51,332 [myid:] - INFO [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@645] - Got user-level KeeperException when processing sessionid:0x150d67d0cd10003 type:create cxid:0xa zxid:0x50 txntype:-1 reqpath:n/a Error Path:/onyx/0e14715d-51b9-4e2b-af68-d5292f276afc/windows Error:KeeperErrorCode = NodeExists for /onyx/0e14715d-51b9-4e2b-af68-d5292f276afc/windows
This is a peer just trying to recreate a ZooKeeper path that was already created by another peer, and it can be safely ignored.
How should I benchmark on a single machine?
Definitely turn off messaging short circuiting, as messaging short circuiting will improve performance in a way that is unrealistic for multi-node use. Remember to turn messaging short circuiting back on for production use, as it does improve performance overall.