What does Onyx offer?

In this chapter, I’ll enumerate and explain the reasons why we built Onyx.

An Information Model

Information models are often superior to APIs, and almost always better than DSLs. The hyper-flexibility of a data structure literal allows Onyx workflows and catalogs to be constructed at a distance, meaning on another machine, in a different language, by another program, etc.

The information model for an Onyx workflow has the distinct advantage that it’s possible to compile other workflow representations (perhaps a datalog or SQL query) into the workflow that Onyx understands. The Information Model chapter describes a target for data structure compilation.

Temporal Decoupling

To the extent that Onyx places data at the highest importance, very few Onyx constructs actually need to be generated at the same time as program deployment or peer registration. Programs can create workflows, drop them to a database, and pull them out at a later time without any problems.

Elimination of Macros

Macros are a tremendously powerful tool, but are often inappropriate for end-user consumption of an API. Onyx goes beyond Storm’s defbolt and defspout by making vanilla Clojure functions shine. These functions need no context to execute and do not require any dynamic bindings. They receive all information they need via parameters, which are injected by Onyx’s task lifecycles.

Plain Clojure Functions

To the same point above, we want plain Clojure functions to be the building blocks for application logic. Onyx’s functions can be tested directly without any special test runner.

Testing Without Mocking

In general, your design is in trouble when you’ve reached for with-redefs or something along those lines to mock functions. Onyx places a high importance around programming against interfaces, and even more-so around putting space in-between small components with channels. Onyx programs can be tested in development mode, and moved to production mode with only a small configuration file change. If you’d like to change your input or output plugins, all you need to do is re-associate the catalog entry with something like an in-memory plugin. No interface mocking code required.

Easy Parameterization of Workflows

It’s particularly telling that many distributed frameworks don’t offer an easy way to parameterize workflows. Onyx puts space between the caller and the function definition. Parameterize tasks inside the catalog, and update the catalog entry at will. Additionally, Onyx allows peers to spin up their own parameters at boot-up time.

Transparent Code Reuse for Batch and Streaming

Onyx uses the notion of a sentinel value to transparently switch between streaming and batching modes. This makes it really easy to be able to reuse the same code for both batch and streaming computations.

AOT Nothing

Onyx AOT’s absolutely nothing on your behalf. When you’re ready to stand your jar up, simply uberjar and start executing on the target machine. Hadoop and Storm cause dependency hell (In Storm’s case, you’re restricted to a particular version of Clojure because you’re locked in by the Executor) by providing their own dependencies on top of yours. Onyx won’t mess with your dependencies.

You can, however, AOT Onyx yourself to speed up compilation times.

Concepts

We’ll take a quick overview of some terms you’ll see in the rest of this user guide.

Terminology

Segment

A segment is the unit of data in Onyx, and it’s represented by a Clojure map. Segments represent the data flowing through the cluster. Segments are the only shape of data that Onyx allows you to emit between functions.

Task

A task is the smallest unit of work in Onyx. It represents an activity of either input, processing, or output.

Workflow

A workflow is the structural specification of an Onyx program. Its purpose is to articulate the paths that data flows through the cluster at runtime. It is specified via a directed, acyclic graph.

The workflow representation is a Clojure vector of vectors. Each inner vector contains exactly two elements, which are keywords. The keywords represent nodes in the graph, and the vector represents a directed edge from the first node to the second.

;;;    in
;;;    |
;;; increment
;;;    |
;;;  output
[[:in :increment] [:increment :out]]
;;;            input
;;;             /\
;;; processing-1 processing-2
;;;     |             |
;;;  output-1      output-2

[[:input :processing-1]
 [:input :processing-2]
 [:processing-1 :output-1]
 [:processing-2 :output-2]]
;;;            input
;;;             /\
;;; processing-1 processing-2
;;;         \      /
;;;          output

[[:input :processing-1]
 [:input :processing-2]
 [:processing-1 :output]
 [:processing-2 :output]]
Tip
Example projects: flat-workflow, multi-output-workflow

Catalog

All inputs, outputs, reducers, and functions in a workflow must be described via a catalog. A catalog is a vector of maps, strikingly similar to Datomic’s schema. Configuration and docstrings are described in the catalog.

Example:

[{:onyx/name :in
  :onyx/plugin :onyx.plugin.core-async/input
  :onyx/type :input
  :onyx/medium :core.async
  :onyx/batch-size batch-size
  :onyx/max-peers 1
  :onyx/doc "Reads segments from a core.async channel"}

 {:onyx/name :inc
  :onyx/fn :onyx.peer.min-peers-test/my-inc
  :onyx/type :function
  :onyx/batch-size batch-size}

 {:onyx/name :out
  :onyx/plugin :onyx.plugin.core-async/output
  :onyx/type :output
  :onyx/medium :core.async
  :onyx/batch-size batch-size
  :onyx/max-peers 1
  :onyx/doc "Writes segments to a core.async channel"}]

Flow Conditions

In contrast to a workflow, flow conditions specify on a segment-by-segment basis which direction data should flow determined by predicate functions. This is helpful for conditionally processing a segment based off of its content.

Example:

[{:flow/from :input-stream
  :flow/to [:process-adults]
  :flow/predicate :my.ns/adult?
  :flow/doc "Emits segment if this segment is an adult."}

Function

A function is a construct that receives segments and emits segments for further processing. It literally is a Clojure function.

Lifecycle

A lifecycle is a construct that describes the lifetime of a task. There is an entire chapter devoted to lifecycles, but to be brief, a lifecycle allows you to hook in and execute arbitrary code at critical points during a task. A lifecycle carries a context map that you can merge results back into for use later.

Windows

Windows are a construct that partitions a possible unbounded sequence of data into finite pieces, allowing aggregations to be specified. This lets you treat an infinite sequence of data as if it were finite over a given period of time.

Plugin

A plugin is a means for hooking into data sources to extract data as input and produce data as output. Onyx comes with a few plugins, but you can craft your own, too.

Sentinel

A sentinel is a value that can be pushed into Onyx to signal the end of a stream of data. This effectively lets Onyx switch between streaming and batching mode. The sentinel in Onyx is represented by the Clojure keyword :done.

Peer

A Peer is a node in the cluster responsible for processing data. A single "peer" refers to a physical machine, though we often use the terms peer and virtual peer interchangeably when the difference doesn’t matter.

Virtual Peer

A Virtual Peer refers to a single peer process running on a single physical machine. A single Virtual Peer executes at most one task at a time.

Job

A job is the collection of a workflow, catalog, flow conditions, lifecycles, and execution parameters. A job is most coarse unit of work, and every task is associated with exactly one job - hence a peer can only be working at most one job at any given time.

Architecture & Low Level Design

This chapter outlines how Onyx works on the inside to meet the required properties of a distributed data processing system. This is not a formal proof nor an iron-clad specification for other implementations of Onyx. I will do my best to be transparent about how everything is working under the hood - good and bad. If something doesn’t make sense, keep moving. There are inevitable forward references.

High Level Components

Peer

A Peer is a node in the cluster responsible for processing data. A peer generally refers to a physical machine as its typical to only run one peer per machine.

Virtual Peer

A Virtual Peer refers to a single concurent worker running on a single physical machine. Each virtual peer spawns a small number threads since it uses asynchronous messaging. All virtual peers are equal, whether they are on the same physical machine or not. Virtual peers communicate segments directly to one another, and coordinate strictly via the log in ZooKeeper.

ZooKeeper

Apache ZooKeeper is used as both storage and communication layer. ZooKeeper takes care of things like CAS, consensus, and atomic counting. ZooKeeper watches are at the heart of how Onyx virtual peers detect machine failure.

Aeron

Aeron is the primary messaging transport layer. The transport layer is pluggable, though we don’t support any other transports at this time.

Asynchronous Barrier Snapshotting

Asynchronous Barrier Snapshotting (ABS) provides fault tolerance and exactly once processing by inserting and tracking barriers that flow through the Directed Acyclic Graph (DAG).

Motivation

ABS improves performance by reducing acking overhead present in Onyx 0.9, and allows for exactly once aggregations which do not require message de-duplication. In ABS, consistent state snapshots can be made by tracking and aligning the barriers, and snapshotting state at appropriate points of barrier alignment.

Concepts

Onyx 0.10.0 and above uses the Asynchronous Barrier Snapshotting method described in Lightweight Asynchronous Snapshots for Distributed Dataflows, Carbone et al., and Distributed Snapshots: Determining Global States of Distributed Systems, Chandy, Lamport to ensure fault tolerance and exactly once processing of data (not exactly once side effects!).

Every job is assigned a coordinator peer, that notifies input peers of when they should inject a barrier into their datastream (generally every n seconds). These barriers are tracked and aligned throughout the job, with the tasks performing snapshots of their state every time a barrier is aligned from all of its input channels.

Concepts:

  • Barrier: a message injected into the data stream, containing the epoch id of the barrier.

  • Epoch: the id of the barrier, re-starting from 0 whenever the cluster has performed a reallocation.

  • Coordinator - a process that injects a barrier into the data stream on a schedule.

  • Barrier Alignment: occurs when barriers with a particular id have been received from all input channels on a peer.

  • Snapshot: Peer state that can be stored whenever a barrier alignment has occurred.

  • Channel: Network messaging channel. Channel may become blocked waiting for channel alignment.

ABS Execution Example

Step 1: Coordinator peer emits barrier with epoch 3 after the coordinator period passes.

Coordinator emits epoch 3

Step 2: :input1 peer synchronizes on epoch 3, snapshots state to durable storage, and re-emits the barrier.

:input1 synchronizes and emits barrier

Step 3: :input2 peer synchronizes on epoch 3, snapshots state to durable storage, and re-emits the barrier. :agg1 reads barrier with epoch 3 from :input1, blocks the channel.

:input2 synchronizes and emits barrier

Step 4: :agg1 synchronizes on epoch 3, snapshots state to durable storage, and re-emits the barrier to :output.

Coordinator emits epoch 3

Phases of Execution

A batch of segments runs through the phases and execution states described here in the cheat sheet.

The Log

This design centers around a totally ordered sequence of commands using a log structure. The log acts as an immutable history and arbiter. It’s maintained through ZooKeeper using a directory of persistent, sequential znodes. Virtual peers act as processes that consume from the log. At the time a peer starts up, it initializes its local replica to the "empty state". Log entries represent deterministic, idempotent functions to be applied to the local replica. The peer plays the log from the beginning, applying each log entry to its local replica. The local replica is transformed from one value to another. As a reaction to each replica transformation, the peer may send more commands to the tail of the log. Peers may play the log at any rate. After each peer has played k log entries, all peers at time k will have exactly the same local replica. Peers store everything in memory - so if a peer fails, it simply reboots from scratch with a new identifier and plays the log forward from the beginning. Since it has a new identifier, it has no association to the commands it previously issued; this prevents live lock issues.

The Inbox and Outbox

Every peer maintains its own inbox and output. Messages received appear in order on the inbox, and messages to-be-sent are placed in order on the outbox.

Messages arrive in the inbox as commands are proposed into the ZooKeeper log. Technically, the inbox need only be size 1 since all log entries are processed strictly in order. As an optimization, the peer can choose to read a few extra commands behind the one it’s currently processing. In practice, the inbox will probably be configured with a size greater than one.

The outbox is used to send commands to the log. Certain commands processed by the peer will generate other commands. For example, if a peer is idle and it receives a command notifying it about a new job, the peer will react by sending a command to the log requesting that it be allocated for work. Each peer can choose to pause or resume the sending of its outbox messages. This is useful when the peer is just acquiring membership to the cluster. It will have to play log commands to join the cluster fully, but it cannot volunteer to be allocated for work since it’s not officially yet a member of the cluster.

Applying Log Entries

This section describes how log entries are applied to the peer’s local replica. A log entry is a persistent, sequential znode. Its content is a map with keys :fn and :args. :fn is mapped to a keyword that finds this log entry’s implementation. :args is mapped to another map with any data needed to apply the log entry to the replica.

Peers begin with the empty state value, and local state. Local state maintains a mapping of things like the inbox and outbox - things that are specific to this peer, and presumably can’t be serialized as EDN.

Each peer starts a thread that listens for additions to the log. When the log gets a new entry, the peer calls onyx.extensions/apply-log-entry. This is a function that takes a log entry and the replica, and returns a new replica with the log entry applied to it. This is a value-to-value transformation.

diagram 1

A single peer begins with the empty replica ({}) and progressively applies log entries to the replica, advancing its state from one immutable value to the next.

diagram 2

A peer reads the first log entry and applies the function to its local replica, moving the replica into a state "as of" entry 0

diagram 4

Because application of functions from the log against the replica are deterministic and free of side effects, peers do not need to coordinate about the speed that each plays the log. Peers read the log on completely independent timelines

Peers effect change in the world by reacting to log entries. When a log entry is applied, the peer calls onyx.extensions/replica-diff, passing it the old and new replicas. The peer produces a value summarizing what changed. This diff is used in subsequent sections to decide how to react and what side-effects to carry out.

Next, the peer calls onyx.extensions/reactions on the old/new replicas, the diff, and its local state. The peer can decide to submit new entries back to the log as a reaction to the log entry it just saw. It might react to "submit-job" with "volunteer-for-task", for instance.

diagram 5

After a peer reads a log entry and applies it to the log replica, it will (deterministically!) react by appending zero or more log entries to the tail of the log.

Finally, the peer can carry out side-effects by invoking onyx.extensions/fire-side-effects!. This function will do things like talking to ZooKeeper or writing to core.async channels. Isolating side effects means that a subset of the test suite can operate on pure functions alone. Each peer is tagged with a unique ID, and it looks for this ID in changes to its replica. The ID acts very much like the object orientated "this", in that it uses the ID to differentiate itself to conditionally perform side effects across an otherwise uniformly behaving distributed system.

Joining the Cluster

Aside from the log structure and any strictly data/storage centric znodes, ZooKeeper maintains another directory for pulses. Each peer registers exactly one ephemeral node in the pulses directory. The name of this znode is a UUID.

3-Phase Cluster Join Strategy

When a peer wishes to join the cluster, it must engage in a 3-phase protocol. Three phases are required because the peer that is joining needs to coordinate with another peer to change its ZooKeeper watch. I call this process "stitching" a peer into the cluster.

The technique needs peers to play by the following rules: - Every peer must be watched by another peer in ZooKeeper, unless there is exactly one peer in the cluster - in which case there are no watches. - When a peer joins the cluster, all peers must form a "ring" in terms of who-watches-who. This makes failure repair very easy because peers can transitively close any gaps in the ring after machine failure. - As a peer joining the cluster begins playing the log, it must buffer all reactive messages unless otherwise specified. The buffered messages are flushed after the peer has fully joined the cluster. This is because a peer could volunteer to perform work, but later abort its attempt to join the cluster, and therefore not be able to carry out any work. - A peer picks another peer to watch by determining a candidate list of peers it can stitch into. This candidate list is sorted by peer ID. The target peer is chosen by taking the message id modulo the number of peers in the sorted candidate list. The peer chosen can’t be random because all peers will play the message to select a peer to stitch with, and they must all determine the same peer. Hence, the message modulo piece is a sort of "random seed" trick.

diagram 7

At monotonic clock value t = 42, the replica has the above :pairs key, indicates who watches whom. As nodes are added, they maintain a ring formation so that every peer is watched by another.

The algorithm works as follows:

  • let S = the peer to stitch into the cluster

  • S sends a prepare-join-cluster command to the log, indicating its peer ID

  • S plays the log forward

  • Eventually, all peers encounter prepare-join-cluster message that was sent by it

  • if the cluster size is 0:

  • S instantly becomes part of the cluster

  • S flushes its outbox of commands

  • if the cluster size (n) is >= 1:

  • let Q = this peer playing the log entry

  • let A = the set of all peers in the fully joined in the cluster

  • let X = the single peer paired with no one (case only when n = 1)

  • let P = set of all peers prepared to join the cluster

  • let D = set of all peers in A that are depended on by a peer in P

  • let V = sorted vector of (set-difference (set-union A X) D) by peer ID

  • if V is empty:

    • S sends an abort-join-cluster command to the log

    • when S encounters abort-join-cluster, it backs off and tries to join again later

  • let T = nth in V of message-id mod (count V)

  • let W = the peer that T watches

  • T adds a watch to S

  • T sends a notify-join-cluster command to the log, notifying S that it is watched, adding S to P

  • when S encounters notify-join-cluster:

    • it adds a watch to W

    • it sends a accept-join-cluster command, removing S from P, adding S to A

  • when accept-join-cluster has been encountered, this peer is part of the cluster

  • S flushes its outbox of commands

  • T drops its watch from W - it is now redundant, as S is watching W

diagram 13

Peers 1 - 4 form a ring. Peer 5 wants to join. Continued below…​

diagram 14

Peer 5 initiates the first phase of the join protocol. Peer 1 prepares to accept Peer 5 into the ring by adding a watch to it. Continued below…​

diagram 15

Peer 5 initiates the second phase of the join protocol. Peer 5 notifies Peer 4 as a peer to watch. At this point, a stable "mini ring" has been stitched along the outside of the cluster. We note that the link between Peer 1 and 4 is extraneous. Continued below…​

diagram 16

Peer 5 has been fully stitched into the cluster, and the ring is intact

Dead peer removal

Peers will fail, or be shut down purposefully. Onyx needs to: - detect the downed peer - inform all peers that this peer is no longer executing its task - inform all peers that this peer is no longer part of the cluster

Peer Failure Detection Strategy

In a cluster of > 1 peer, when a peer dies another peer will have a watch registered on its znode to detect the ephemeral disconnect. When a peer fails (peer F), the peer watching the failed peer (peer W) needs to inform the cluster about the failure, and go watch the node that the failed node was watching (peer Z). The joining strategy that has been outlined forces peers to form a ring. A ring structure has an advantage because there is no coordination or contention as to who must now watch peer Z for failure. Peer W is responsible for watching Z, because W was watching F, and F was watching Z. Therefore, W transitively closes the ring, and W watches Z. All replicas can deterministically compute this answer without conferring with each other.

diagram 8

The nodes form a typical ring pattern. Peer 5 dies, and its connection with ZooKeeper is severed. Peer 1 reacts by reporting Peer 5’s death to the log. Continued below…​

diagram 9

At t = 45, all of the replicas realize that Peer 5 is dead, and that Peer 1 is responsible for closing the gap by now watching Peer 4 to maintain the ring.

diagram 10

One edge case of this design is the simultaneous death of two or more consecutive peers in the ring. Suppose Peers 4 and 5 die at the exact same time. Peer 1 will signal Peer 5’s death, but Peer 5 never got the chance to signal Peer 4’s death. Continued below…​

diagram 11

Peer 1 signals Peer 5’s death, and closes to the ring by adding a watch to Peer 4. Peer 4 is dead, but no one yet knows that. We circumvent this problem by first determining whether a peer is dead or not before adding a watch to it. If it’s dead, as is Peer 4 in this case, we report it and further close the ring. Continued below…​

diagram 12

Peer 1 signals peer 4’s death, and further closes to the ring by adding a watch to Peer 3. The ring is now fully intact.

Peer Failure Detection Thread

There is a window of time (inbetween when a peer prepares to join the cluster and when its monitoring peer notifies the cluster of its presence) that the monitoring node may fail, effectively deadlocking the new peer. This can occur because a peer will check if its monitoring dead is dead during the prepare phase - essentially performing eviction on a totally dead cluster - and may find a false positive that a node is alive when it is actually dead. The root issue is that ephemeral znodes stick around for a short period of time after the creating process goes down. The new peer must watch its monitor until it delivers the second phase message for joining - notification. When this occurs, we can stop monitoring, because the monitoring node is clearly alive. If the znode is deleted because the process exited, we can safely effect it and free the peer from deadlocking. Issue 416 found this bug, and offers more context about the specific problem that we encountered.

Garbage collection

One of the primary obstacles that this design imposes is the requirement of seemingly infinite storage. Log entries are only ever appended - never mutated. If left running long enough, ZooKeeper will run out of space. Similarly, if enough jobs are submitted and either completed or killed, the in memory replica that each peer houses will grow too large. Onyx requires a garbage collector to be periodically invoked.

When the garbage collector is invoked, two things will happen. The caller of gc will place an entry onto the log. As each peer processed this log entry, it carries out a deterministic, pure function to shrink the replica. The second thing will occur when each peer invokes the side effects for this log entry. The caller will have specified a unique ID such that it is the only one that is allowed to trim the log. The caller will take the current replica (log entry N to this log entry), and store it in an "origin" znode. Anytime that a peer boots up, it first reads out of the origin location. Finally, the caller deletes log entry N to this log entry minus 1. This has the dual effect of making new peers start up faster, as they have less of the log to play. They begin in a "hot" state.

The garbage collector can be invoked by the public API function onyx.api/gc. Upon returning, the log will be trimmed, and the in memory replicas will be compressed.

diagram 17

A peer can start by reading out of the origin, and continue directly to a particular log location.

Command Reference

  • Submitter: peer (P) that wants to join the cluster

  • Purpose: determines which peer (Q) that will watch P. If P is the only peer, it instantly fully joins the cluster

  • Arguments: P’s ID

  • Replica update: assoc {Q P} to :prepare key. If P is the only peer, P is immediately added to the :peers key, and no further reactions are taken

  • Side effects: Q adds a ZooKeeper watch to P’s pulse node

  • Reactions: Q sends notify-join-cluster to the log, with args P and R (R being the peer Q watches currently)

  • Submitter: peer Q helping to stitch peer P into the cluster

  • Purpose: Adds a watch from P to R, where R is the node watched by Q

  • Arguments: P and R’s ids

  • Replica update: assoc {Q P} to :accept key, dissoc {Q P} from :prepare key

  • Side effects: P adds a ZooKeeper watch to R’s pulse node

  • Reactions: P sends accept-join-cluster to the log, with args P, Q, and R

  • Submitter: peer P wants to join the cluster

  • Purpose: confirms that P can safely join, Q can drop its watch from R, since P now watches R, and Q watches P

  • Arguments: P, Q, and R’s ids

  • Replica update: dissoc {Q P} from :accept key, merge {Q P} and {P R} into :pairs key, conj P onto the :peers key

  • Side effects: Q drops its ZooKeeper watch from R

  • Reactions: peer P flushes its outbox of messages

  • Submitter: virtual peer P wants to become active in the cluster

  • Purpose: P affirms that it’s peer group has been safely stitched into the cluster

  • Arguments: P’s id

  • Replica update: conj P into :peers, remove from :orphaned-peers

  • Side effects: All virtual peers configure their workload and possibly start new tasks

  • Reactions: none

  • Submitter: peer (Q) determines that peer (P) cannot join the cluster (P may = Q)

  • Purpose: Aborts P’s attempt at joining the cluster, erases attempt from replica

  • Arguments: P’s id

  • Replica update: Remove any :prepared or :accepted entries where P is a key’s value

  • Side effects: P optionally backs off for a period

  • Reactions: P optionally sends :prepare-join-cluster to the log and tries again

  • Submitter: peer (Q) reporting that peer P is dead

  • Purpose: removes P from :prepared, :accepted, :pairs, and/or :peers, transitions Q’s watch to R (the node P watches) and transitively closes the ring

  • Arguments: peer ID of P

  • Replica update: assoc {Q R} into the :pairs key, dissoc {P R}

  • Side effects: Q adds a ZooKeeper watch to R’s pulse node

  • Submitter: virtual peer P is leaving the cluster

  • Purpose: removes P from its task and consideration of any future tasks

  • Arguments: peer ID of P

  • Replica update: removes P from :peers

  • Side effects: All virtual peers reconfigure their workloads for possibly new tasks

  • Submitter: peer (P), who has seen the leader sentinel

  • Purpose: P wants to propagate the sentinel to all downstream tasks

  • Arguments: P’s ID (:id), the job ID (:job), and the task ID (:task)

  • Replica update: If this peer is allowed to seal, updates :sealing-task with the task ID associated this peers ID.

  • Side effects: Puts the sentinel value onto the queue

  • Reactions: None

  • Submitter: Client, via public facing API

  • Purpose: Send a catalog and workflow to be scheduled for execution by the cluster

  • Arguments: The job ID (:id), the task scheduler for this job (:task-scheduler), a topologically sorted sequence of tasks (:tasks), the catalog (:catalog), and the saturation level for this job (:saturation). Saturation denotes the number of peers this job can use, at most. This is typically Infinity, unless all catalog entries set :onyx/max-peers to an integer value. Saturation is then the sum of those numbers, since it creates an upper bound on the total number of peers that can be allocated to this task.

  • Replica update:

  • Side effects: None

  • Reactions: If the job scheduler dictates that this peer should be reallocated to this job or another job, sends :volunteer-for-task to the log

  • Submitter: Client, via public facing API

  • Purpose: Stop all peers currently working on this job, and never allow this job’s tasks to be scheduled for execution again

  • Arguments: The job ID (:job)

  • Replica update: Adds this job id to :killed-jobs vector, removes any peers in :allocations for this job’s tasks. Switches the :peer-state for all peer’s executing a task for this job to :idle.

  • Side effects: If this peer is executing a task for this job, stops the current task lifecycle

  • Reactions: If this peer is executing a task for this job, reacts with :volunteer-for-task

gc

  • Submitter: Client, via public facing API

  • Purpose: Compress all peer local replicas and trim old log entries in ZooKeeper.

  • Arguments: The caller ID (:id)

  • Replica update: Clears out all data in all keys about completed and killed jobs - as if they never existed.

  • Side effects: Deletes all log entries before this command’s entry, creates a compressed replica at a special origin log location, and updates to the pointer to the origin

  • Reactions: None

  • Submitter: peer (P), who has successfully started its incoming buffer

  • Purpose: Indicates that this peer is ready to receive segments as input

  • Replica update: Updates :peer-state under the :id of this peer to set its state to :active.

  • Side effects: If this task should immediately be sealed, seals this task

  • Reactions: None.

  • Submitter: This is a special entry that should never be appended to the log

  • Purpose: Perform a hard reset of the replica, replacing its entire value. This is useful if a log subscriber is reading behind a garbage collection call and tries to read a non-existent entry. The new origin can be found and its value applied locally via the subscriber.

  • Replica update: Replaces the entire value of the replica with a new value

  • Side effects: None.

  • Reactions: None.

APIs

Core API

The Core API is used to start/stop resources, jobs, and monitor job progress. It’s accessible through the onyx.api namespace.

start-env

Starts a development environment with in-memory ZooKeeper. Helpful for developing locally without needing to start any other services.

start-peer-group

Starts a resource pool to be shared across a group of peers. You should only start one peer group per physical machine.

start-peers

Starts N virtual peers to execute tasks. In a production environment, you should start by booting up N virtual peers for N cores on the physical machine. Tune performance from there.

submit-job

Submits a job to Onyx to be scheduled for execution. Takes a map with keys :catalog, :workflow, :flow-conditions, :windows, :triggers, :metadata, and :task-scheduler. Returns a map of :job-id and :task-ids, which map to a UUID and vector of maps respectively. :metadata is a map of values that must serialize to EDN. :metadata will be logged with all task output, and is useful for identifying a particular task based on something other than its name or ID.

Additionally, :metadata may optionally contain a :job-id key. When specified, this key will be used for the job ID instead of a randomly chosen UUID. Repeated submissions of a job with the same :job-id will be treated as an idempotent action. If a job with the same ID has been submitted more than once, the original task IDs associated with the catalog will be returned, and the job will not run again, even if it has been killed or completed. It is undefined behavior to submit two jobs with the same :job-id metadata whose :workflow, :catalog, :flow-conditions, etc are not equal.

await-job-completion

Given a job ID, blocks the calling thread until all the tasks for this job have been completed.

Tip
Example project: block-on-job-completion

gc

Invokes the garbage collector. Compresses the replica in Zookeeper, freeing up storage and deleting log history. Frees up memory on the local, in memory replica on all peers.

kill-job

Stops this job from executing, never allowing it to be run again.

Tip
Example project: kill-job

subscribe-to-log

Sends all events in the log to a core.async channel. Events are received in the order that they appeared in the log. Starts from the beginning of the log, blocking until more entries are available.

shutdown-peer

Shuts down a single peer, stopping any task that it is presently executing.

shutdown-peer-group

Shuts down the peer group, releasing any messaging resources it was holding open.

shutdown-env

Shuts down the development environment and stops in memory ZooKeeper.

Functions

This section outlines how Onyx programs execute behavior. Onyx uses plain Clojure functions to carry out distributed activity. You have the option of performing grouping and aggregation on each function.

Functional Transformation

A Function is a construct that takes a segment as a parameter and outputs a segment or a seq of segments. Functions are meant to literally transform a single unit of data in a functional manner. The following is an example of a function:

(defn my-inc [{:keys [n] :as segment}]
  (assoc segment :n (inc n)))

Note that you may only pass segments between functions - no other shape of data is allowed.

Tip
Example project: filtering

Function Parameterization

A function can be parameterized before a job is submitted to Onyx. The segment is always the last argument to the function. There are multiple ways to paramerize a function, and they can be used in combination.

  • Via the catalog :onyx/params entry

(def catalog
{...
 :my/param-1 "abc"
 :my/param-2 "def"
 :onyx/params [:my/param-1 :my/param-2]
 ...}

The function is then invoked with (partial f "abc" "def"). The order is controlled by the vector of :onyx/params.

  • Via :onyx.core/params in the before-task-start lifecycle hook

(defn before-task-start-hook [event lifecycle]
  {:onyx.core/params [42]})

The function is then invoked with (partial f 42).

Using this approach "hard sets" the parameters list. Other parameters may already exist in onyx.core/params. If you want to retain those parameter, concat them together and return the new value on onyx.core/params.

  • Via the :onyx.peer/fn-params peer configuration

(def peer-opts
  {...
   :onyx.peer/fn-params {:my-fn-name [64]}})

The function is then invoked with (partial f 64).

This approach is useful for parameterizing a task regardless of which job it is in. If both onyx.peer/fn-params and :onyx/params are set for the same task, they are concatenated together, with fn-params coming first.

Grouping & Aggregation

Grouping ensures that "like" values are always routed to the same virtual peer, presumably to compute an aggregate. Grouping is specified inside of a catalog entry. There are two ways to group: by key of segment, or by arbitrary function. Grouping by key is a convenience that will reach into each segment and pin all segments with the same key value in the segment together. Grouping functions receive a single segment as input. The output of a grouping function is the value to group on. Grouped functions must set keys :onyx/min-peers and :onyx/flux-policy. See below for a description of these.

Group By Key

To group by a key or a vector of keys in a segment, use :onyx/group-by-key in the catalog entry:

{:onyx/name :sum-balance
 :onyx/fn :onyx.peer.kw-grouping-test/sum-balance
 :onyx/type :function
 :onyx/group-by-key :name
 :onyx/min-peers 3
 :onyx/flux-policy :continue
 :onyx/batch-size 1000}

Group By Function

To group by an arbitrary function, use :onyx/group-by-fn in the catalog entry:

{:onyx/name :sum-balance
 :onyx/fn :onyx.peer.fn-grouping-test/sum-balance
 :onyx/type :function
 :onyx/group-by-fn :onyx.peer.fn-grouping-test/group-by-name
 :onyx/min-peers 3
 :onyx/flux-policy :continue
 :onyx/batch-size 1000}

Flux Policies

Functions that use the grouping feature are presumably stateful. For this reason, unless :continue is used, once a job begins, no matter how many peers are added to the cluster, no new peers will be allocated to grouping tasks. When more peers are added after the job begins, the hashing algorithm loses its consistency, and stateful operations won’t work correctly.

Given the fact the Onyx will not add more peers to regular grouping tasks after it begins, we introduce a new parameter - :onyx/min-peers. This should be set to an integer that indicates the minimum number of peers that will be allocated to this task before the job can begin. Onyx may schedule more than the minimum number that you set. You can create an upper bound by also using :onyx/max-peers.

Tip
Example project: max-peers.

One concern that immediately needs to be handled is addressing what happens if a peer on a grouping task leaves the cluster after the job has begun? Clearly, removing a peer from a grouping task also breaks the consistent hashing algorithm that supports statefulness. The policy that is enforced is configurable, and must be chosen by the developer. We offer three policies, outlined below.

Continue Policy

When :onyx/flux-policy is set to :continue on a catalog entry, the hashing algorithm may be inconsistent. Peers can leave or join a task at any point in time. This is desirable for streaming jobs where the data is theoretically infinite or have tasks that benefit from grouping but are not stateful.

Kill Policy

When :onyx/flux-policy is set to :kill, the job is killed and all peers abort execution of the job. Some jobs cannot compute correct answers if there is a shift in the hashing algorithm’s consistency. An example of this is a word count batch job.

Recover Policy

When :onyx/flux-policy is set to :recover, the job continues as is if any peers abort execution of the task. If any other peers are available, they will be added to this task to progressively meet the :onyx/min-peers number of peers concurrently working on this task.

Batch Functions

Sometimes you might be able to perform a function more efficiently over a batch of segments rather than processing one segment at a time, such as writing segments to a database in a non-output task. You can receive the entire batch of segments as an argument to your task by setting :onyx/batch-fn? to true in your catalog entry for your function. Your function must return a sequence with the same number of elements as its incoming batch has. The elements are then matched up positionally to pair parent segments with their outgoing child segments. Elements in the output may either be a single segment or a vector of segments, as normal. The utility of this feature is you can use functions that are more efficient over a large number of segments rather than one at a time.

An example catalog entry:

{:onyx/name :inc
 :onyx/fn :onyx.peer.batch-function-test/my-inc
 :onyx/type :function
 :onyx/batch-fn? true
 :onyx/batch-size batch-size}

And an example catalog function to correspond to this entry:

(defn my-inc [segments]
  (map #(update-in % [:n] inc) segments))

The default value for this option is false.

Leaf Functions

Sometimes you’re going to want a node in your workflow with no outgoing connections that doesn’t perform I/O against a database. You can do this by setting :onyx/type to :output, :onyx/medium to :function, and :onyx/plugin to onyx.peer.function/function. Then you can specify an :onyx/fn pointing to a regular Clojure function. For example:

{:onyx/name :leaf-task
 :onyx/fn ::add-to-results
 :onyx/plugin :onyx.peer.function/function
 :onyx/medium :function
 :onyx/type :output
 :onyx/batch-size 20}

Flow Conditions

This section covers flow conditions. Flow conditions are used for isolating logic about whether or not segments should pass through different tasks in a workflow, and support a rich degree of composition with runtime parameterization.

Summary

Workflows specify the structure of your computation as a directed, acyclic graph. A workflow describes all possible routes that a segment can take as it enters your workflow. On the other hand, we often have the need to specify how an individual segment moves throughout your workflow. Many times, a segment conditionally moves from one task to another. This is a concept that Onyx takes apart and turns into its own idea, independent of the rest of your computation. They’re called Flow Conditions. It should be mentioned straight away that Flow Conditions are entirely optional, and your program can ignore them entirely if you’d like. Omitting them leads to the default behavior, which sends a segment to all immediate downstream tasks.

Motivating Example

The easiest way to learn how to use flow conditions is to see an example. Suppose we have the following workflow snippet:

[[:input-stream :process-children]
 [:input-stream :process-adults]
 [:input-stream :process-female-athletes]
 [:input-stream :process-everyone]
 ...]

This workflow takes some input in (presumably a stream of people), and directs segments to four possible tasks - :process-children, :process-adults, :process-female-athletes, and :process-everyone. Suppose we want to conditionally direct a segment to zero or more of these tasks, depending on some predicates. We use flow conditions to carry out this work. Flow conditions are their own data structure that are bundled along with the workflow and catalog to onyx.api/submit-job (with key :flow-conditions). Here’s an example of what a flow conditions data structure would look like for our proposed workflow:

[{:flow/from :input-stream
  :flow/to [:process-children]
  :my/max-child-age 17
  :flow/predicate [:my.ns/child? :my/max-child-age]
  :flow/doc "Emits segment if this segment is a child."}

 {:flow/from :input-stream
  :flow/to [:process-adults]
  :flow/predicate :my.ns/adult?
  :flow/doc "Emits segment if this segment is an adult."}

 {:flow/from :input-stream
  :flow/to [:process-female-athletes]
  :flow/predicate [:and :my.ns/female? :my.ns/athlete?]
  :flow/doc "Emits segment if this segment is a female athlete."}

 {:flow/from :input-stream
  :flow/to [:process-everyone]
  :flow/predicate :my.ns/constantly-true
  :flow/doc "Always emit this segment"}]

The basic idea is that every entry in the Flow Conditions data structure denotes a relationship between a task and its downstream tasks. :flow/from indicates the task that the segment is leaving, and :flow/to indicates the tasks that the segment should be sent to if the predicate evaluates to true. The predicate is denoted by :flow/predicate, which is a keyword or sequence of keywords that are resolved to a function. Later in this section, we’ll cover how exactly the predicate function is constructed.

There is one flow conditions data structure per job - that is, there is one vector of maps. The order that you specify the flow conditions in matters. More on that later in this section.

Tip
Example project: flow-combine

Predicate Function Signatures

A predicate function is a Clojure function that takes at least four parameters - a context map, the old segment, the new segment, and the collection of all new segments produced from the old segment. Predicates can take parameters at runtime. They will be appended to the end of the function invocation. See Predicate Parameters for further discussion of how to use runtime parameters.

Predicates for the above examples can be seen below:

(defn child? [event old-segment new-segment all-new max-age]
  (<= (:age new-segment) max-age))

(defn adult? [event old-segment new-segment all-new]
  (>= (:age new-segment) 18))

(defn female? [event old-segment new-segment all-new]
  (= (:gender new-segment) "Female"))

(defn athlete? [event old-segment new-segment all-new]
  (= (:job new-segment) "athlete"))

(def constantly-true (constantly true))

Predicate Parameters

Predicate functions can take parameters at runtime. In this first flow condition, we use the parameter :my/max-child-age and set its value to 17. We pass this value to the predicate by surrounding it with brackets, as in: [:my.ns/child? :my/max-child-age]. The parameters are appended to the end of the function call to the predicate. See Predicate Function Signatures in this section to see the arguments that are passed into the predicate regardless each invocation.

Key Exclusion

Sometimes, the decision of whether to allow a segment to pass through to the next task depends on some side effects that were a result of the original segment transformation. Onyx allows you to handle this case by adding extra keys to your segment that comes out of the transformation function. These extra keys are visible in your predicate function, and then stripped off before being sent to the next task. You can indicate these "extra keys" by the setting :onyx/exclude-keys to a vector of keys.

For example, if we had the following transformation function:

(defn my-function [x]
  (assoc x :result 42 :side-effects-result :blah))

Our predicate for flow conditions might need to use the :side-effects-result to make a decision. We don’t want to actually send that information over out to the next task, though - so we :flow/exclude-keys on :side-effects-results to make it disappear after the predicate result has been realized.

{:flow/from :input-stream
 :flow/to [:process-adults]
 :flow/predicate :my.ns/adult?
 :flow/exclude-keys [:side-effects-result]
 :flow/doc "Emits segment if this segment is an adult."}
Tip
Example project: flow-exclude-keys

Predicate Composition

One extraordinarily powerful feature of Flow Conditions is its composition characteristics. Predicates can be composed with logical and, or, and not. We use composition to check if the segment is both female and an athlete in [:and :my.ns/female? :my.ns/athlete?]. Logical function calls must be surrounded with brackets, and may be nested arbitrarily. Functions inside of logical operator calls may be parameterized, as in [:and :my.ns/female? [:my.ns/child? :my/max-child-age]]. Parameters may not specify logical functions.

Tip
Example project: flow-predicate-composition

Match All/None

Sometimes, you want a flow condition that emits a value to all tasks if the predicate is true. You can use short hand to emit to all downstream tasks:

{:flow/from :input-stream
 :flow/to :all
 :flow/short-circuit? true
 :flow/predicate :my.ns/adult?}

Similarly, sometimes you want to emit to no downstream tasks:

{:flow/from :input-stream
 :flow/to :none
 :flow/short-circuit? true
 :flow/predicate :my.ns/adult?}

If a flow condition specifies :all as its :flow/to, it must come before any other flow conditions. If a flow condition specifies :none as its :flow/to, it must come directly behind an :all condition, or first if there is no :all condition. This is because of the semantics of short circuiting. We’ll discuss what short circuiting means later in this section.

:flow/to set to :all or :none must always set :flow/short-circuit? to true.

:flow/from may be set to :all. This directs all immediate upstream links to pass segments to this task’s flow condition. :flow/from as :all does not impose order constraints as :flow/to set to :all does.

Short Circuiting

If multiple flow condition entries evaluate to a true predicate, their :flow/to values are unioned (duplicates aren’t acknowledged), as well as their :flow/exclude-keys. Sometimes you don’t want this behavior, and you want to specify exactly the downstream tasks to emit to - and not check any more flow condition entries. You can do this with :flow/short-circuit? set to true. Any entry that has :flow/short-circuit? set to true must come before any entries for an task that have it set to false or nil.

Tip
Example project: flow-short-circuit

Exceptions

Flow Conditions give you leverage for handling exceptions without miring your code in try/catch logic. If an exception is thrown from an Onyx transformation function, you can capture it from within your flow conditions by setting :flow/thrown-exception? to true. It’s default value is false. If an exception is thrown, only flow conditions with :flow/thrown-exception? set to true will be evaluated. The value that is normally the segment which is sent to the predicate will be the exception object that was thrown. Exception flow conditions must have :flow/short-circuit? set to true. Note that exceptions don’t serialize. This feature is meant to be used in conjunction with Post-transformations and Actions for sending exception values to downstream tasks.

{:flow/from :input-stream
 :flow/to [:error-task]
 :flow/short-circuit? true
 :flow/thrown-exception? true
 :flow/predicate :my.ns/handle-error?}

And the predicate might be:

(defn handle-error? [event old ex-obj all-new]
  (= (type ex-obj) java.lang.NullPointerException))

This will only restrict the flow from :input-stream to :error-task when an exception is thrown - see the discussion of Short Circuiting above. When an exception is not thrown, the default behaviour will apply. For example, if there are later flow conditions, they will apply. If not will flow through to all tasks if there are no other flow conditions for that task.

Post-transform

Post-transformations are extension provided to handle segments that cause exceptions to arise. If a flow condition has :flow/thrown-exception? set to true, it can also set :flow/post-transform to a keyword. This keyword must have the value of a fully namespace qualified function on the classpath. This function will be invoked with three parameters: the event map, the segment that caused the exception, and the exception object. The result of this function, which must be a segment, will be passed to the downstream tasks. This allows you to come up with a reasonable value to pass downstream when you encounter an exception, since exceptions don’t serialize anyway. :flow/exclude-keys will be called on the resulting transformed segment.

Example:

{:flow/from :input-stream
 :flow/to [:error-task]
 :flow/short-circuit? true
 :flow/thrown-exception? true
 :flow/post-transform :my.ns/post-transform
 :flow/predicate :my.ns/handle-error?}

And an example post-transform function might be:

(defn post-transform [event segment exception-obj]
  {:error :my-exception-value})

Actions

After a set of flow conditions has been evaluated for a segment, you usually want to send the segment downstream to the next set of tasks. Other times, you want to retry to process the segment because something went wrong. Perhaps a database connection wasn’t available, or an email couldn’t be sent.

Onyx provides Flow Conditions :flow/action to accomplish this. By setting :flow/action to :retry, a segment will expire from the internal pool of pending messages and be automatically retried from its input task. If any of the :flow/action`s from the matching flow conditions are `:retry, the segment will be retried and will not be sent downstream. This parameter is optional, and it’s default value is nil. nil will cause the segment to be sent to all downstream tasks that were selected from evaluating the flow conditions. Any flow condition clauses with :flow/action set to :retry must also have :flow/short-circuit? set to true, and :flow/to set to :none.

Here’s a quick example:

[{:flow/from :input-stream
  :flow/to :none
  :flow/short-circuit? true
  :flow/predicate :my.ns/adult?
  :flow/action :retry}

 {:flow/from :input-stream
  :flow/to [:task-a]
  :flow/predicate :my.ns/child?}]

Predicate Exceptions

It’s possible for an exception to be thrown during the evaluation of a predicate inside a flow condition. If left unhandled, the exception will be raised out of the calling context, making it available to be cause by Lifecycle exception handlers.

This is occassionally problematic because lifecycle exception handlers, by design, do not allow you to send segments to downstream tasks. When this is required, add :flow/predicate-errors-to key to your flow condition. This key acts exactly the same as :flow/to, except that it is only used when a predicate throws an exception.

This feature must be used in conjunction with :flow/post-transform to make the value serializable for downstream peers to consume. The other difference is that the second argument given to the post transformation function will not be a segment - it will be a map of keys :old and :new representing the segment before and after :onyx/fn was applied to it.

Messaging

Background

The messaging layer takes care of the direct peer to peer transfer of segment batches, and coordination barriers.

Messaging Implementations

The Onyx messaging implementation is pluggable and alternative implementations can be selected via the :onyx.messaging/impl Peer Configuration option.

Aeron Messaging

Owing to Aeron’s high throughput and low latency, Aeron is the default Onyx messaging implementation. There are a few relevant considerations when using the Aeron implementation.

Port Use

The Aeron messaging implementation will use the port configured via :onyx.messaging/peer-port. This UDP port must be unfirewalled.

Media Driver

Aeron requires a media driver to be used on each node. Onyx provides an embedded media driver for local testing, however use of the embedded driver is not recommended in production. The embedded driver can be configured via the :onyx.messaging.aeron/embedded-driver? Peer Configuration option.

When using Aeron messaging in production, a media driver should be created in another java process. You can do this via the following code snippet, or by using the Aeron distribution.

(ns your-app.aeron-media-driver
  (:require [clojure.core.async :refer [chan <!!]])
  (:import [io.aeron Aeron$Context]
           [io.aeron.driver MediaDriver MediaDriver$Context ThreadingMode]))

(defn -main [& args]
  (let [ctx (doto (MediaDriver$Context.))
        media-driver (MediaDriver/launch ctx)]
    (println "Launched the Media Driver. Blocking forever...")
    (<!! (chan))))
Configuration Options

Aeron is independently configurable via Java properties (e.g. JAVA_OPTS="-Daeron.mtu.length=16384"). Configuration of these may cause different performance characteristics, and certain options may need to be configured in order to communicate large segments between peers.

Documentation for these configuration options can be found in Aeron’s documentation.

Lifecycles

Lifecycles are a feature that allow you to control code that executes at particular points during task execution on each peer. Lifecycles are data driven and composable.

Summary

There are several interesting points to execute arbitrary code during a task in Onyx. Onyx lets you plug in and calls functions before a task, after a task, before a batch, and after a batch on every peer. Additionally, there is another lifecycle hook that allows you to delay starting a task in case you need to do some work like acquiring a lock under contention. A peer’s lifecycle is isolated to itself, and lifecycles never coordinate across peers. Usage of lifecycles are entirely optional. Lifecycle data is submitted as a data structure at job submission time.

Lifecycle Phases

Before task set up

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a boolean value indicating whether to start the task or not. If false, the process backs off for a preconfigured amount of time and calls this task again. Useful for lock acquisition. This function is called prior to any processes inside the task becoming active.

Before task execution

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called after processes in the task are launched, but before the peer listens for incoming segments from other peers.

Before batch start

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called prior to receiving a batch of segments from the reading function.

After reading batch of segments

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called immediately after a batch of segments has been read by the peer. The segments are available in the event map by the key :onyx.core/batch.

After apply fn batch

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called after the :onyx/fn has been applied to the input segments and created output segments.

After batch

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called before the peer relinquishes its task. No more segments will be received.

Handle Exception

If an exception is thrown during any lifecycle execution except after-task-stop, one or more lifecycle handlers may be defined. If present, the exception will be caught and passed to this function. See the details on the Onyx cheat sheet.

Example

Let’s work with an example to show how lifecycles work. Suppose you want to print out a message at all the possible lifecycle hooks. You’d start by defining 9 functions for the 9 hooks:

(ns my.ns)

(defn start-task? [event lifecycle]
  (println "Executing once before the task starts.")
  true)

(defn before-task-start [event lifecycle]
  (println "Executing once before the task starts.")
  {})

(defn after-task-stop [event lifecycle]
  (println "Executing once after the task is over.")
  {})

(defn before-batch [event lifecycle]
  (println "Executing once before each batch.")
  {})

(defn after-read-batch [event lifecycle]
  (println "Executing once after this batch has been read.")
  {})

(defn after-apply-fn [event lifecycle]
  (println "Executing once after the onyx/fn has been called on the input segments.")
  {})

(defn after-batch [event lifecycle]
  (println "Executing once after each batch.")
  {})

(defn handle-exception [event lifecycle lifecycle-phase e]
  (println "Caught exception: " e)
  (println "Returning :restart, indicating that this task should restart.")
  :restart)

Notice that all lifecycle functions return maps except start-task?. This map is merged back into the event parameter that you received. start-task? is a boolean function that allows you to block and back off if you don’t want to start the task quite yet. This function will be called periodically as long as false is returned. If more than one start-task? is specified in your lifecycles, they must all return true for the task to begin. start-task? is invoked before before-task-start.

Next, define a map that wires all these functions together by mapping predefined keywords to the functions:

(def calls
  {:lifecycle/start-task? start-task?
   :lifecycle/before-task-start before-task-start
   :lifecycle/before-batch before-batch
   :lifecycle/after-read-batch after-read-batch
   :lifecycle/after-apply-fn after-apply-fn
   :lifecycle/after-batch after-batch
   :lifecycle/after-task-stop after-task-stop
   :lifecycle/handle-exception handle-exception})

Each of these 9 keys maps to a function. All of these keys are optional, so you can mix and match depending on which functions you actually need to use.

Finally, create a lifecycle data structure by pointing :lifecycle/calls to the fully qualified namespaced keyword that represents the calls map that we just defined. Pass it to your onyx.api/submit-job call:

(def lifecycles
  [{:lifecycle/task :my-task-name-here
    :lifecycle/calls :my.ns/calls
    :lifecycle/doc "Test lifecycles and print a message at each stage"}])

(onyx.api/submit-job
  peer-config
  {
  ...
  :lifecycles lifecycles
  ...
  }

It is also possible to have a lifecycle apply to every task in a workflow by specifying :lifecycle/task :all. This is useful for instrumenting your tasks with metrics, error handling, or debugging information.

(def lifecycles
  [{:lifecycle/task :all
    :lifecycle/calls :my.ns/add-metrics
    :lifecycle/doc "Instruments all tasks in a workflow with the example function 'add-metrics'"}])

You can supply as many sets of lifecycles as you want. They are invoked in the order that they are supplied in the vector, giving you a predictable sequence of calls. Be sure that all the keyword symbols and functions are required onto the classpath for the peer that will be executing them.

Tip
Example project: lifecycles

Windowing and Aggregation

This section discusses a feature called windowing. Windows allow you to group and accrue data into possibly overlapping buckets. Windows are intimately related to the Triggers feature. When you’re finished reading this section, head over to the Triggers chapter next.

Summary

Windowing splits up a possibly unbounded data set into finite, possibly overlapping portions. Windows allow us create aggregations over distinct portions of a stream, rather than stalling and waiting for the entire data data set to arrive. In Onyx, Windows strictly describe how data is accrued. When you want to do something with the windowed data, you use a Trigger. See the chapter on Triggers for more information. Onyx’s windowing mechanisms are strong enough to handle stream disorder. If your data arrives in an order that isn’t "logical" (for example, :event-time keys moving backwards in time), Onyx can sort out the appropriate buckets to put the data in.

Windows can be attached to any :onyx/type, however they are often best used with :onyx/type :reduce.

Reduce Task Type

When windows are used on a :onyx/type :reduce task, messages transformed with :onyx/fn will not be emitted to downstream tasks. When windows are used with :onyx/type :reduce on an intermediate task, a trigger must be included with :trigger/emit supplied. :trigger/emit will ensure that triggered messages as emitted to downstream tasks in lieu of the transformed segments.

When :onyx/type :reduce is used on a terminal task, :onyx/plugin is no longer mandatory.

Tip
Example project: terminal-reduce-task

Window Types

The topic of windows has been widely explored in the literature. There are different types of windows. Currently, Onyx supports Fixed, Sliding, Global, and Session windows. We will now explain the supported window types.

Fixed Windows

Fixed windows, sometimes called Tumbling windows, span a particular range and do not slide. That is, fixed windows never overlap one another. Consequently, a data point will fall into exactly one instance of a window (often called an extent in the literature). As it turns out, fixed windows are a special case of sliding windows where the range and slide values are equal. You can see a visual below of how this works, where the |--| drawings represent extents. Each window is of range 5. Time runs horizontally, while the right-hand side features the extent bound running vertically. The first extent captures all values between 0 and 4.99999…​

1, 5, 10, 15, 20, 25, 30, 35, 40
|--|                                [0  - 4]
   |--|                             [5  - 9]
      |---|                         [10 - 14]
          |---|                     [15 - 19]
              |---|                 [20 - 24]
                  |---|             [25 - 29]
                      |---|         [30 - 34]
                          |---|     [35 - 39]

Example:

{:window/id :collect-segments
 :window/task :identity
 :window/type :fixed
 :window/aggregation :onyx.windowing.aggregation/count
 :window/window-key :event-time
 :window/range [5 :minutes]}
Tip
Example project: fixed-windows

Sliding Windows

In contrast to fixed windows, sliding windows allow extents to overlap. When a sliding window is specified, we have to give it a range for which the window spans, and a slide value for how long to wait between spawning a new window extent. Every data point will fall into exactly range / slide number of window extents. We draw out what this looks like for a sliding window with range 15 and slide 5:

1, 5, 10, 15, 20, 25, 30, 35, 40
|---------|                         [0  - 14]
   |----------|                     [5  - 19]
      |-----------|                 [10 - 24]
          |-----------|             [15 - 29]
              |-----------|         [20 - 34]
                  |-----------|     [25 - 39]

Example:

{:window/id :collect-segments
 :window/task :identity
 :window/type :sliding
 :window/aggregation :onyx.windowing.aggregation/conj
 :window/window-key :event-time
 :window/range [15 :minutes]
 :window/slide [5 :minute]}
Tip
Example project: sliding-windows

Global Windows

Global windows are perhaps the easiest to understand. With global windows, there is exactly one window extent that match all data that enters it. This lets you capture events that span over an entire domain of time. Global windows are useful for modeling batch or timeless computations.

<- Negative Infinity                Positive Infinity ->
|-------------------------------------------------------|

Example:

{:window/id :collect-segments
 :window/task :identity
 :window/type :global
 :window/aggregation :onyx.windowing.aggregation/count}]
Tip
Example project: global-windows

Session Windows

Session windows are windows that dynamically resize their upper and lower bounds in reaction to incoming data. Sessions capture a time span of activity for a specific key, such as a user ID. If no activity occurs within a timeout gap, the session closes. If an event occurs within the bounds of a session, the window size is fused with the new event, and the session is extended by its timeout gap either in the forward or backward direction.

For example, if events with the same session key occured at 5, 7, and 20, and the session window used a timeout gap of 5, the windows would look like the following:

1, 5, 10, 15, 20, 25, 30, 35, 40
   |-|                           [5 - 7]
              |                  [20 - 20]

Windows that aren’t fused to anything are single points in time (see 20). If an event occurs before or after its timeout gap on the timeline, the two events fuse, as 5, and 7 do.

Example:

{:window/id :collect-segments
 :window/task :identity
 :window/type :session
 :window/aggregation :onyx.windowing.aggregation/conj
 :window/window-key :event-time
 :window/timeout-gap [5 :minutes]}]
Tip
Example project: session-windows

Units

Onyx allows you to specify range and slide values in different magnitudes of units, so long as the units can be coverted to the same unit in the end. For example, you can specify the range in minutes, and the slide in seconds. Any value that requires units takes a vector of two elements. The first element represents the value, and the second the unit. For example, window specifications denoting range and slide might look like:

{:window/range [1 :minute]
 :window/slide [30 :seconds]}

See the information model for all supported units. You can use a singular form (e.g. :minute) instead of the plural (e.g. :minutes) where it makes sense for readability.

Onyx is also capable of sliding by :elements. This is often referred to as "slide-by-tuple" in research. Onyx doesn’t require a time-based range and slide value. Any totally ordered value will work equivalently.

Aggregation

Windows allow you accrete data over time. Sometimes, you want to store all the data. Othertimes you want to incrementally compact the data. Window specifications must provide a :window/aggregation key. Onyx comes with a number of built-in aggregation functions that cover some common use-cases, but also allows you to provide your own. Below, we will walk through the built-in aggregation functions. For an example of how to provide your own aggregation function, see state-example.

:onyx.windowing.aggregation/conj

The :conj aggregation is the simplest. It collects segments for this window and retains them in a vector, unchanged.

{:window/id :collect-segments
 :window/task :identity
 :window/type :sliding
 :window/aggregation :onyx.windowing.aggregation/conj
 :window/window-key :event-time
 :window/range [30 :minutes]
 :window/slide [5 :minutes]
 :window/doc "Collects segments on a 30 minute window sliding every 5 minutes"}

:onyx.windowing.aggregation/count

The :onyx.windowing.aggregation/count operation counts the number of segments in the window.

{:window/id :count-segments
 :window/task :identity
 :window/type :fixed
 :window/aggregation :onyx.windowing.aggregation/count
 :window/window-key :event-time
 :window/range [1 :hour]
 :window/doc "Counts segments in one hour fixed windows"}

:onyx.windowing.aggregation/sum

The :sum operation adds the values of :age for all segments in the window.

{:window/id :sum-ages
 :window/task :identity
 :window/type :fixed
 :window/aggregation [:onyx.windowing.aggregation/sum :age]
 :window/window-key :event-time
 :window/range [1 :hour]
 :window/doc "Adds the :age key in all segments in 1 hour fixed windows"}

:onyx.windowing.aggregation/min

The :min operation retains the minimum value found for :age. An initial value must be supplied via :window/init.

{:window/id :min-age
 :window/task :identity
 :window/type :fixed
 :window/aggregation [:onyx.windowing.aggregation/min :age]
 :window/init 100
 :window/window-key :event-time
 :window/range [30 :minutes]
 :window/doc "Finds the minimum :age in 30 minute fixed windows, default is 100"}

:onyx.windowing.aggregation/max

The :max operation retains the maximum value found for :age. An initial value must be supplied via :window/init.

{:window/id :max-age
 :window/task :identity
 :window/type :fixed
 :window/aggregation [:onyx.windowing.aggregation/max :age]
 :window/init 0
 :window/window-key :event-time
 :window/range [30 :minutes]
 :window/doc "Finds the maximum :age in 30 minute fixed windows, default is 0"}

:onyx.windowing.aggregation/average

The :average operation maintains an average over :age. The state is maintained as a map with three keys - :n, the number of elements, :sum, the running sum, and :average, the running average.

{:window/id :average-age
 :window/task :identity
 :window/type :fixed
 :window/aggregation [:onyx.windowing.aggregation/average :age]
 :window/window-key :event-time
 :window/range [30 :minutes]
 :window/doc "Finds the average :age in 30 minute fixed windows, default is 0"}

:onyx.windowing.aggregation/collect-by-key

The :collect-by-key operation maintains a collection of all segments with a common key.

{:window/id :collect-members
 :window/task :identity
 :window/type :fixed
 :window/aggregation [:onyx.windowing.aggregation/collect-by-key :team]
 :window/window-key :event-time
 :window/range [30 :minutes]
 :window/doc "Collects all users on the same :team in 30 minute fixed windows"}

Grouping

All of the above aggregates have slightly different behavior when :onyx/group-by-key or :onyx/group-by-fn are specified on the catalog entry. Instead of the maintaining a scalar value in the aggregate, Onyx maintains a map. The keys of the map are the grouped values, and values of the map are normal scalar aggregates.

For example, if you had the catalog entry set to :onyx/group-by-key with value :name, and you used a window aggregate of :onyx.windowing.aggregation/count, and you sent through segments [{:name "john"} {:name "tiffany"} {:name "john"}], the aggregate map would look like {"john" 2 "tiffany" 1}. Since triggers fire once per group, each trigger only receive the count of a single group as state, not the entire aggregate map. To operate on all grouped values as a single map you need to write a custom aggregation function, see state-example

Window Specification

See the Information Model chapter for an exact specification of what values the Window maps need to supply. Here we will describe what each of the keys mean.

key name description

:window/id

A unique identifier per window

:window/task

The workflow task over which the window operates

:window/type

Which type of window this is (fixed, sliding, etc)

:window/aggregation

The aggregation function to apply, as described above. If this operation is over a key, this is a vector, with the second element being the key.

:window/window-key

The key over which the range will be calculated

:window/range

The span of the window

:window/slide

The delay to wait to start a new window after the previous window

:window/init

The initial value required for some types of aggregation

:window/min-value

A strict mininum value that :window/window-key can ever be, default is 0.

:window/doc

An optional docstring explaining the window’s purpose

Triggers

In this section, we talk about Triggers. Triggers are a feature that interact with Windows. Windows capture and bucket data over time. Triggers let you release the captured data over a variety stimuli.

Summary

Windows capture data over time and place segments into discrete, possibly overlapping buckets. By itself, this is a relatively useless concept. In order to harness the information that has been captured and rolled up, we need to move it somewhere. Triggers let us interact with the state in each extent of a window.

Tip
Example project: aggregation

Trigger Types

Onyx ships a number of trigger implementations that can be used out of the box. Each trigger fires in response to a particular stimulus. All triggers implemented in Onyx core fire at task completion. We outline each here and show an example of each in action.

:timer

This trigger sleeps for a duration of :trigger/period. When it is done sleeping, the :trigger/sync function is invoked with its usual arguments. The trigger goes back to sleep and repeats itself.

{:trigger/window-id :collect-segments
 :trigger/on :onyx.triggers/timer
 :trigger/period [3 :seconds]
 :trigger/sync ::write-to-dynamo
 :trigger/doc "Writes state to DynamoDB every 5 seconds, discarding intermediate state"}

:segment

Trigger wakes up in reaction to a new segment being processed. Trigger only fires once every :trigger/threshold segments. When the threshold is exceeded, the count of new segments goes back to 0, and the looping proceeds again in the same manner.

{:trigger/window-id :collect-segments
 :trigger/on :onyx.triggers/segment
 :trigger/fire-all-extents? true
 :trigger/threshold [5 :elements]
 :trigger/sync ::write-to-stdout
 :trigger/doc "Writes the window contents to stdout every 5 segments"}

:punctuation

Trigger wakes up in reaction to a new segment being processed. Trigger only fires if :trigger/pred evaluates to true. The signature of :trigger/pred is of arity-2: trigger, state-event. Punctuation triggers are often useful to send signals through that indicate that no more data will be coming through for a particular window of time.

{:trigger/window-id :collect-segments
 :trigger/on :onyx.triggers/punctuation
 :trigger/pred ::trigger-pred
 :trigger/sync ::write-to-stdout
 :trigger/doc "Writes the window contents to std out if :trigger/pred is true for this segment"}

:watermark

Trigger wakes up in reaction to a new segment being processed. Trigger only fires if the value of :window/window-key in the segment exceeds the upper-bound in the extent of an active window. This is a shortcut function for a punctuation trigger that fires when any piece of data has a time-based window key that is above another extent, effectively declaring that no more data for earlier windows will be arriving.

{:trigger/window-id :collect-segments
 :trigger/on :onyx.triggers/watermark
 :trigger/sync ::write-to-stdout
 :trigger/doc "Writes the window contents to stdout when this window's watermark has been exceeded"}

:percentile-watermark

Trigger wakes up in reaction to a new segment being processed. Trigger only fires if the value of :window/window-key in the segment exceeds the lower-bound plus the percentage of the range as indicated by :trigger/watermark-percentage, a double greater than 0 and less than 1. This is an alternative to :watermark that allows you to trigger on most of the data arriving, not necessarily every last bit.

{:trigger/window-id :collect-segments
 :trigger/on :onyx.triggers/percentile-watermark
 :trigger/watermark-percentage 0.95
 :trigger/sync ::write-to-stdout
 :trigger/doc "Writes the window contents to stdout when this window's watermark is exceeded by 95% of its range"}

Refinement Modes

A refinement mode allows you to articulate what should happen to the state of a window extent after a trigger has been invoked.

A refinement must implement the following interface:

(defn create-state-update [trigger state state-event]
;; create a state machine update from the window state and the event
)

(defn apply-state-update [trigger state entry]
;; apply that update to the window state after the trigger has been called
)

(def my-refinement
  {:refinement/create-state-update create-state-update
   :refinement/apply-state-update apply-state-update})

Syncing

Onyx offers you the ultimate flexibility on what to do with your state during a trigger invocation. Set :trigger/sync to a fully qualified, namespaced keyword pointing to a function on the classpath at runtime. This function takes 5 arguments: The event map, the window map that this trigger is defined on, the trigger map, a state-event map, and the window state as an immutable value. Its return value is ignored.

This function is invoked when the trigger fires, and is used to do any arbitrary action with the window contents, such as sync them to a database. It is called once per window instance. In other words, if a fixed window exists with 5 instances, the firing of a Timer trigger will call the sync function 5 times. You can use lifecycles to supply any stateful connections necessary to sync your data. Supplied values from lifecycles will be available through the first parameter - the event map.

Trigger Specification

See the Information Model chapter for an exact specification of what values the Trigger maps need to supply. Here we will describe what each of the keys mean.

key name description

:trigger/id

An id for the trigger that is unique over the window that it is placed on. As of 0.10.0 :trigger/id`s are required. `:trigger/id is useful for resuming trigger state via resume points.

:trigger/window-id

A :window/id specified in the collection of windows.

:trigger/refinement

Fully qualified namespaced keyword for the mode of refinement.

:trigger/on

Fully qualified namespaced keyword for the trigger called to determine whether to fire as a reaction e.g. :onyx.triggers/segment.

:trigger/sync

Fully qualified namespaced keyword of a function to call with the state.

:trigger/emit

A fully qualified, namespaced keyword pointing to a function on the classpath at runtime. This function takes 5 arguments: the event map, the window map that this trigger is defined on, the trigger map, a state-event map, and the window state as an immutable value. It must return a segment, or vector of segments, which will flow downstream.

:trigger/fire-all-extents?

When true, fires every extent of a window in response to a trigger.

:trigger/doc

An optional docstring explaining the trigger’s purpose.

Watermarks

In this section, we talk about Watermarks. Watermarks are a feature that interacts with Windows and Triggers. Watermarks give you a way to use windows in a way that can relate to the event time/ingest time of the data that is being processed.

Summary

Watermarks can be used in two ways:

  1. Via :onyx/assign-watermark-fn on the input task, which will be applied to the segments read from an input task, and must return the number of milliseconds since epoch, or nil if none exists for that segment.

  2. Via native support in your onyx input plugin, provided by implementing onyx.plugin.protocols/WatermarkedInput protocol. Only onyx-kafka plugin is currently supported, however more plugin support is being worked on.

Watermarks work as follows:

  1. The input sources read a message with a watermarked time, if that time is later than the current watermark update the watermark.

  2. The minimum watermark is computed across all of the input tasks and their peers, this is the new watermark for the inputs.

  3. The watermark flows through the DAG. The minimum watermark read by each a task’s edges is used the current watermark for that task.

  4. As the minimum watermark increases for a task, all triggers are called with a state-event, where the event-type is :watermark, and where the state-event includes a :watermarks map, containing the current watermarks. The watermarks map is intended to be used by the trigger to decide whether to fire for a given window.

  5. The current minimum watermark for a task flows down to its downstream tasks.

The overall effect is that if you have two input tasks, that are offset in event-time, it is is easy to ensure that no windows are processed until both tasks have passed a given time.

The easiest use of watermarks is via the :onyx.triggers/watermark trigger, which only fires when the upper-bound of a window is passed, with an optional delay :trigger/delay.

Aggregation & State Management

This section discusses state management and fault tolerance used in windowing/streaming joins.

Summary

Onyx provides the ability to perform updates to a state machine for segments which are calculated over windows. For example, a grouping task may accumulate incoming values for a number of keys over windows of 5 minutes. This feature is commonly used for aggregations, such as summing values, though it can be used to build more complex state machines.

State Example

;; Task definition
{:onyx/name :sum-all-ages
 :onyx/fn :clojure.core/identity
 :onyx/type :function
 :onyx/group-by-key :name
 :onyx/flux-policy :recover
 :onyx/min-peers 2
 :onyx/batch-size 20}

;; Window definition
{:window/id :sum-all-ages-window
 :window/task :sum-all-ages
 :window/type :global
 :window/aggregation [:your-sum-ns/sum :age]
 :window/doc "Adds the :age key in all segments in a global window."}

As segments are processed, an internal state within the calculated window is updated. In this case we are trying to sum the ages of the incoming segments.

The :sum-all-ages window definition referenced above, contains a :window/aggregation map. These window aggregation maps are defined by containing the following keys (see the cheat-sheet for more information):

Key Optional? Description

:aggregation/init

true

Fn (window) to initialise the state.

:aggregation/create-state-update

false

Fn (window, segment) to generate a serializable state machine update.

:aggregation/apply-state-update

false

Fn (window, state, entry) to apply state machine update entry to a state.

:aggregation/super-aggregation-fn

true

Fn (window, state-1, state-2) to combine two states in the case of two windows being merged.

The :window/aggregation keys should map to corresponding functions. This example shows those function definitions, paired with the :window/aggregation keys, and bound to the ::sum aggregation referenced above.

(ns your-sum-ns)


(defn sum-init-fn [window]
  0)

;; Given the example input in the next section, the below segment shape (over a kafka transport) would look something like this.
;; {:serialized-key-size 36,
;;  :key "70144dea-cdd1-443d-9e7f-55cc5d0928d7",
;;  :offset 0,
;;  :serialized-value-size 22,
;;  :partition 0,
;;  :timestamp 1514680072539,
;;  :message {:age 49, :name "John"}}
(defn sum-aggregation-fn [window segment]
  (let [k (-> segment :message :age)]
    {:value k}))

;; Now just pull out the value and add it to the previous state
(defn sum-application-fn [window state value]
  (+ state (:value value)))


;; sum aggregation referenced in the window definition.
(def sum
  {:aggregation/init sum-init-fn
   :aggregation/create-state-update sum-aggregation-fn
   :aggregation/apply-state-update sum-application-fn})

Let’s try processing some example segments using this aggregation:

[{:name "John" :age 49}
 {:name "Madeline" :age 55}
 {:name "Geoffrey" :age 14}]

Results in the following events:

Action Result

Initial state

0

Incoming segment

{:name "John" :age 49}

Changelog entry

[:set-value 49]

Applied to state

49

Incoming segment

{:name "Madeline" :age 55}

Changelog entry

[:set-value 104]

Applied to state

104

Incoming segment

{:name "Geoffrey" :age 14}

Changelog entry

[:set-value 128]

Applied to state

128

This state can be emitted via triggers or another mechanism. By describing changelog updates as a vector with a log command, such as :set-value aggregation function can emit multiple types of state transition if necessary.

Fault Tolerance

To allow for full recovery after peer crashes, the window and trigger states must be checkpointed.

A consistent snapshot is performed over the cluster every :onyx.peer/coordinator-barrier-period-ms ms. Whenever a change to the cluster allocation occurs, this state snapshot is recovered from durable storage.

Storage can be configured via the peer-config.

The ZooKeeper window storage choice should not be used in production, unless paired with a periodic call to onyx.api/gc-checkpoints.

As checkpoints will only accrete, typical production use should incorporate onyx.api/gc-checkpoints, and onyx.api/clear-checkpoints when any checkpoints will not be used to transfer state from one job to another via resume-points.

Exactly Once Side-Effects

Exactly once side-effects resulting from a segment being processed may occur, as exactly once side-effects are impossible to achieve. Onyx guarantees that a window state updates resulting from a segment are performed exactly once, however any side-effects that occur as a result of the segment being processed cannot be guaranteed to only occur once.

Resume Points

Resume Points is an Onyx feature used to transfer state between jobs.

Motivation

Onyx jobs maintain valuable state as they run. Such state includes the state of task windows, triggers, the offset an input task (for example Kafka topics) has read up to, etc. Onyx’s design enforces immutable jobs, which cannot be updated live. Given that production operations often require long running streaming jobs to be updated/redeployed, there needs to be a way to transition consistent state snapshots from job to job. Resume points is a data defined mapping between input and state checkpoints stored by killed/running/completed jobs to allow window contents, trigger contents and input offsets to be resumed in newly submitted jobs.

Resume Point Definition

A resume point takes the following form:

{:my-input-task {:input {:mode :resume
                         :task-id :src-input-task
                         :replica-version 4
                         :epoch 4
                         :tenancy-id #uuid "3b30664d-b442-d55b-c80b-979f9b7df12a"
                         :job-id #uuid "2d6fe8f3-f911-fd21-2db9-471d41fbe15b"
                         :slot-migration :direct}}
 :windowed-task {:windows {:my-window-1
                            {:mode :resume
                             :replica-version 4
                             :epoch 4
                             :tenancy-id #uuid "3b30664d-b442-d55b-c80b-979f9b7df12a"
                             :job-id #uuid "2d6fe8f3-f911-fd21-2db9-471d41fbe15b"
                             :task-id :src-windowed-task
                             :window-id :collect-segments
                             :slot-migration :direct}}}}

Each task with input checkpoint state, or window state must include a direct mapping from a previous job. The format of resume points is purposefully verbose, in order to allow a job to resume state from multiple jobs, split a large job into multiple smaller jobs, rename tasks, etc. Note, that windowed input tasks may define both :input and :windows keys in their resume point.

The resume point should then supplied with the job, via the :resume-point key in the job map.

Failure modes: if a resume point is supplied, and any task, or task window does not contain a resume definition, onyx.api/submit-job will fail to validate the job. If a window is supplied with a resume point, and the resume point does not contain that window, the job will be killed at run-time.

Resume Point Modes

Each input and window resume definition can use one of different modes. The first mode is :resume as demonstrated above. :resume mode requires snapshot coordinates be supplied as part of the definition. The second type is {:mode :initialize}, which when supplied (without any other keys), will initialize the input or window state as if there was no state to resume from. This is useful when adding new tasks or windows to your job, as all tasks with input or window state must account for all their resume definitions in order to submit.

Resume Point Helpers

Often the verbose form of the resume point is more specific than we really need, especially if the job is a direct resume of a job with the same definitions, backed by different underlying code. Therefore, Onyx supplies some APIs to improve the usability of resume points.

Given the snapshot coordinates:

---
(def snapshot-coordinates
 {:tenancy-id #uuid "1e5d841a-a77e-00c4-4758-b89c416c58ce"
  :job-id #uuid "9dd7b8f8-9854-40d7-8d93-2d205a1a2226"
  :replica-version 4
  :epoch 3})
---

Call (onyx.api/build-resume-point your-job-map snapshot-coordinates) to expand out a resume point that defaults to the given snapshot coordinates for all windows and input tasks.

The latest job snapshot coordinates can be read using the api call (onyx.api/job-snapshot-coordinates peer-config tenancy-id job-id).

Therefore to recover directly from a given job-id, one can call:

---
(->> job-id
     (onyx.api/job-snapshot-coordinates peer-config tenancy-id)
     (onyx.api/build-resume-point new-job)
     (assoc new-job :resume-point)
     (onyx.api/submit-job peer-config))
---
== Testing Onyx Jobs

In this chapter, we’ll cover what you need to know about testing your Onyx application code.

Overview

Onyx eschews customized abstractions for describing distributed programs. As a consequence, application code written on top of Onyx ends up being regular Clojure functions and data structures. Onyx remains out of the way, and you’re free to test your functions as you would any other Clojure program. It’s also useful to roll up your entire job and test it as it will run in production. This section is dedicated to giving you a set of idioms and tools for working with Onyx jobs during the development phase.

Automatic Resource Clean up

While it’s easy enough to run Onyx with an in-memory ZooKeeper instance (see the Environment chapter for how to do this), there are a host of development comforts that are missing when working on your Onyx code. One key pain point is the clean shutdown of resources on job completion or failure. Often when developing a new Onyx job, you’ll make lots of mistakes and your job will be killed. Before the next time you run your job, it’s a good idea to make sure your peers, peer group, and development environment all cleanly shut down. This can be a moderately tricky task at the repl where a Thread may be interrupted by a Control-C sequence. To this end, Onyx provides a onyx.test-helper namespace with a handy macro known as with-test-env.

with-test-env takes three parameters: a symbol to bind the development environment var to, a sequence of [number-of-peers-to-start, peer-config, env-config], and a body of expressions. This macro starts a development environment with the requested number of peers, runs the body of expressions, and cleanly shuts the development enivronment down when either the body of expressions completes, or an exception is thrown (included ThreadInterruptedException). You’ll notice that this is an anaphoric macro. The macro creates a development environment, then binds it to the user supplied symbol. Using this macro, you don’t need to worry about ports remaining open that should have been closed on shutdown. You can safely interrupt Onyx at any point in time during the job execution.

In practice, this looks something like the following:

(deftest my-onyx-job-test
  (let [id (java.util.UUID/randomUUID)
        config (load-config)
        env-config (assoc (:env-config config) :onyx/tenancy-id id)
        peer-config (assoc (:peer-config config) :onyx/tenancy-id id)]
    (with-test-env [test-env [3 env-config peer-config]]
      (let [catalog ...
            workflow ...
            lifecycles ...]
            (onyx.api/submit-job peer-config
                                   {:catalog catalog
                                    :workflow workflow
                                    :lifecycles lifecycles
                                    :task-scheduler :onyx.task-scheduler/balanced})
        (let [results (take-segments! ...)
              expected ...]
          (is (= expected results)))))))

Code Reloading

Another painful part of writing asynchronous code in general is the matter of reloading code without restarting your repl. Something that plays particularly well with Onyx is clojure.tools.namespace. A pattern that we like to use is to create a user.clj file for the developer with the following contents:

(ns user
  (:require [clojure.tools.namespace.repl :refer [refresh set-refresh-dirs]]))

(set-refresh-dirs "src" "test")

(defn init [])

(defn start [])

(defn stop [])

(defn go []
  (init)
  (start))

(defn reset []
  (stop)
  (refresh))

When you want to reload your code, invoke the (reset) function. You can supply any extra application specific code in init, start, and stop. Combining this pattern with the with-test-env macro, you should virtually never need to restart your repl while developing Onyx code.

In-Memory I/O

Production level Onyx jobs talk to input streams like Kafka, or databases like Postgres. It’s not always helpful to run those pieces of infrastructure while developing your job. Early on, we like to just focus on what the shape of the data will look like and you in-memory I/O with core.async instead of Kafka or what have you. There’s plenty of documentation on how to actually use the core.async plugin. The big question is - how does one most effectively use core.async for development, and more realistic I/O targets for staging and production?

Our approach leverages Onyx’s all-data job specification. We’ve found it helpful to isolate the parts of the catalog and lifecycles that will vary between different environments and use a "builder" pattern. We start with a "base job" - the pieces of the job that are invariants across all environments:

(defn base-job [mode onyx-id task-scheduler]
  (let [datomic-uri (my-env-helper/get-datomic-uri mode onyx-id)]
    {:workflow wf/parse-event-workflow
     :catalog (cat/build-parse-event-catalog datomic-uri)
     :flow-conditions (fcp/parser-flow-conditions :parse-log
                                                  [:write-parse-failures]
                                                  [:write-to-datomic])
     :lifecycles sl/base-parse-event-lifecycles
     :task-scheduler task-scheduler}))

The function which builds the base job takes a "mode", indicating what environment we should construct. We like to use keywords - things like :dev, :staging, :prod. Functions that receive these keywords are often multimethods which dispatch on mode, building the appropriate configuration files. In this example, we use the mode parameter to vary which Datomic URI we’re going to use in our catalog.

Next, we add in environment-specific code using a little utility function to update the base job that’s passed in as a parameter:

(defn add-metrics [job m-config]
  (my-fun-utilities/add-to-job job {:lifecycles
                                     (metrics/build-metrics-lifecycles
                                       (:riemann/host m-config)
                                       (:riemann/port m-config))}))

(defn add-logging [job]
  (my-fun-utilities/add-to-job job {:lifecycles sl/log-parse-event-lifecycles}))

Finally, we put them all together with a simple cond→:

(let [mode :dev
      log? true
      metrics? false]
  (cond-> (base-job mode onyx-id task-scheduler)
    log? (add-logging)
    metrics? (add-metrics m-config)))

It’s important to remember that we’re working with plain Clojure data here. The sky is the limit on how you can put the building blocks together!

Backpressure

A common problem with streaming platforms is whether peers and tasks can exhibit backpressure to upstream tasks.

Onyx provides natural backpressure via Aeron’s bounded term buffers. Whenever a message is offered to a peer that has not consumed enough of its term buffer, the offer will not succeed and a backpressure signal will be signaled back to the peer. As Onyx’s messaging is written in a non blocking way, the peer will then back-off for between idle-min-sleep-ns and idle-max-sleep-ns.

Onyx’s checkpointing mechanism is also subject to backpressure as only one checkpoint will be running for a job at any given time.

Deployment

Onyx has no built-in mechanism for deployment. Rather, we let you deploy at your comfort. We’ll describe some approaches to doing this.

Deployment Style

Unlike Hadoop and Storm, Onyx does not have a built-in deployment feature. To deploy your application, you need to uberjar your program and place it on every node in your cluster. Start up the uberjar, passing it your shared Onyx ID and ZooKeeper address. Once it connects, you’ve successfully deployed!

We’ve chosen not to build in a deployment strategy because there are so many flexible approaches to handling deployment. S3, Docker, Mesos, Swarm, and Kubernetes are a few good choices. We’ll describe some of these strategies below.

Shared File System

Perhaps the most primitive deployment that you can use is a shared file system. Write a small script to SCP your uberjar to each of the nodes in your cluster and start the jar. You might use S3 for this, and a utility that allows you to manipulate a few SSH sessions in parallel. This is great for getting started on 3 nodes or so. You’ll want to use something a bit more elaborate as you go to production and get bigger, though.

Docker

We recommend packaging your uberjar into a Docker container. This is useful for locking in a specific Java version. Make sure you expose the ports that peers will be communicating on (this is configurable, see the peer configuration chapter). Once you have that down, you can upload your Docker image to DockerHub and use a simple script to pull down your Docker image and begin its execution. This is another step in the right direction, but read on to remove the scripting part of this task for real production systems.

Kubernetes

We have used Kubernetes extensively with Onyx and highly recommend it for use with Onyx.

Mesos and Marathon

Mesos and Marathon are a pair of applications that work together to manage your entire cluster of machines. I recommend deploying your Docker image onto Marathon, which will allow you to scale at runtime and avoid any scripting. Marathon ensures that if your Docker container goes down, it will be restarted. This is one of our favorite solutions for production scale, cluster-wide management at the moment.

Production Check List

A production check list is included in the Environment documentation.

Monitoring

When setting up an Onyx cluster in production, it’s helpful to know what Onyx itself is doing. Onyx exposes a set of callbacks that are triggered on certain actions.

Reporters

When you’re taking Onyx to production, it’s not enough to know what your application-specific code is doing. You need to have insight into how Onyx is operating internally to be able to tune performance at an optimal level.

onyx-peer-http-query contains a prometheus endpoint at /metrics. Alternatively, any monitoring java agent that export JMX metrics can be used to export metrics to other providers such as NewRelic.

Health Checks

onyx-peer-http-query contains an Aeron health check that we strongly recommend monitoring.

Task Monitoring

Onyx monitors numerous metrics related to each peer’s task operations and states task-states.

Each of the following metrics is scoped by job id, task, and peer id in the following format:

JMX tag: job.JOBID.task.TASKNAME.peer-id.PEERID.slot-id.SLOT-ID.METRIC ATTRIBUTE_NAME VALUE

Prometheus tag: METRIC_VALUETYPE{job=JOBID, task=TASKNAME, peer_id=PEERID} VALUE

Latency Metrics

Metric

Description

recover_latency

Time to fully recover the job after reallocation. Measured from the time the coordinator sends barrier with epoch 0.

checkpoint_serialization_latency

Latency to serialize the checkpoint for the task.

checkpoint_store_latency

Latency to store the checkpoint in durable storage.

serialization_latency

Latency to serialize segments for messaging.

since_heartbeat

Time since this peer heartbeated.

since_received_heartbeat

Maximum time since heartbeat has been received from any peers. This is a good gauge of when a peer may be timed out by the peer receiving heartbeats.

task_lifecycle_apply_fn

Latency to call :onyx/fn on a batch of messages.

task_lifecycle_read_batch

Latency to read a batch of messages from messenger or the input medium.

task_lifecycle_write_batch

Latency to write a batch of messages to the messenger or the output medium.

Available Metric Types

  • 50thPercentile

  • 75thPercentile

  • 95thPercentile

  • 98thPercentile

  • 99thPercentile

  • 999thPercentile

  • Count

  • FifteenMinuteRate

  • FiveMinuteRate

  • Max

  • Mean

  • MeanRate

  • Min

  • OneMinuteRate

  • StdDev

Gauges / Counters

Metric

Description

checkpoint_size

Size of the last checkpoint.

checkpoint_read_bytes

Number of bytes read from checkpointed storage.

checkpoint_written_bytes

Number of bytes written to checkpointed storage.

replica_version

The job’s allocation replica version that the peer is currently processing. All peers should have the same replica_version in normal operation, as peers with different replica versions are quarantined from each other.

lifecycle_index

The index of the current lifecycle stage for this peer. This gives an indication of what state the peer is currently in. Please look at the onyx.log to see the mapping between lifecycle indexes and index names for this peer.

current_lifecycle_duration

The amount of time that the peer has been in the current lifecycle state. Good indication of whether a task may be blocked. See lifecycle_index to figure out what stage it is stuck in.

offset

Storage medium offset for use by input/output plugins. For example, a Kafka plugin may report the offset that has been read up to in a topic partition.

epoch

The barrier epoch that the peer is up to.

subscription_errors

Number of errors thrown by messenger subscription.

publication_errors

Number of errors thrown by messenger publication.

written_bytes

Total number of bytes written via the messenger

read_bytes

Total number of bytes read from the messenger

peer-group.scheduler-lag

Number of milliseconds that the peer group is behind the coordination log.

peer-group.peers-shutting-down

Number of peers that are currently shutting down. A persistently high number is a sign that something is going wrong with the peers, and that peers are being blocked on shutdown.

Available Metric Types

  • Value

Rate Metrics

Metric Description

peer_group_peer_errors

Rate of errors thrown by peers in this peer group.

epoch_rate

Barrier flow rate.

task_lifecycle_apply_fn_throughput

Throughput for :onyx/fn application in segments.

task_lifecycle_read_batch_throughput

Throughput read from the input medium or messenger in segments.

task_lifecycle_write_batch_throughput

Throughput written to output medium or messenger in segments.

Available Metric Types

  • Count

  • FifteenMinuteRate

  • FiveMinuteRate

  • MeanRate

  • OneMinuteRate

Coordination Monitoring Events

This is the list of all monitoring events that you can register hooks for. The keys listed are present in the map that is passed to the callback function. The names of the events should readily identify what has taken place to trigger the callback.

Event Name Keys

:peer-group.since-heartbeat

:zookeeper-write-log-entry

:event, :latency, :bytes

:zookeeper-read-log-entry

:event, :latency, :bytes

:zookeeper-write-catalog

:event, :latency, :bytes

:zookeeper-write-workflow

:event, :latency, :bytes

:zookeeper-write-flow-conditions

:event, :latency, :bytes

:zookeeper-write-lifecycles

:event, :latency, :bytes

:zookeeper-write-windows

:event, :latency, :bytes

:zookeeper-write-triggers

:event, :latency, :bytes

:zookeeper-write-job-metadata

:event, :latency, :bytes

:zookeeper-write-task

:event, :latency, :bytes

:zookeeper-write-job-hash

:event, :latency, :bytes

:zookeeper-write-chunk

:event, :latency, :bytes

:zookeeper-write-job-scheduler

:event, :latency, :bytes

:zookeeper-write-messaging

:event, :latency, :bytes

:zookeeper-write-exception

:event, :latency, :bytes

:zookeeper-force-write-chunk

:event, :latency, :bytes

:zookeeper-write-origin

:event, :latency, :bytes

:zookeeper-read-catalog

:event, :latency

:zookeeper-read-workflow

:event, :latency

:zookeeper-read-flow-conditions

:event, :latency

:zookeeper-read-lifecycles

:event, :latency

:zookeeper-read-windows

:event, :latency

:zookeeper-read-triggers

:event, :latency

:zookeeper-read-job-metadata

:event, :latency

:zookeeper-read-task

:event, :latency

:zookeeper-read-job-hash

:event, :latency

:zookeeper-read-chunk

:event, :latency

:zookeeper-read-origin

:event, :latency

:zookeeper-read-job-scheduler

:event, :latency

:zookeeper-read-messaging

:event, :latency

:zookeeper-read-exception

:event, :latency

:zookeeper-gc-log-entry

:event, :latency, :position

:group-prepare-join

:event, :id

:group-notify-join

:event, :id

:group-accept-join

Performance Tuning

This chapter details a few tips for getting your Onyx cluster to run as fast as possible.

Onyx

  • Use Clojure 1.7+. 1.7 and 1.8 have some considerable performance enhancements compared to 1.6.

  • SETUP METRICS. We cannot stress this point enough. Please read the guide at onyx-metrics to get started.

  • Set the timbre log level to elide logging calls in production. This should be done at compile time, i.e. when AOT compiling, or when running via leinigen. You can do using the environment variable: TIMBRE_LOG_LEVEL=info command_to_start_your_peers_or_compile_AOT.

  • Turn off clojure assertions. Onyx uses assertions liberally, to find errors early, however these do have an associated cost. Use :global-vars {assert false} in your leiningen profile, or (set! assert false) in your system bootup namespace.

  • Use JVM_OPTS="-server" in production.

  • Use JVM_OPTS="-XX:+UseG1GC". The G1GC has more predictable performance that can improve latency, though may decrease throughput by a small amount. It is strongly recommended you use this GC, unless you have a reason not to.

  • Check that you do not use reflection in any of the code running in your onyx tasks critical paths e.g. in onyx/fn, before/after-batch lifecycles, aggregations, etc. You can check for reflection by using :global-vars {warn-on-reflection true} in your lein profile.

  • Tweak the :onyx/batch-size for the job’s tasks. A batch size of 20-100 segments is a reasonable start that will amortise the cost of onyx’s task lifecycle vs latency.

  • For small segments, batch multiple segments into a single segment, and treat each new segment as a rolled up batch.

  • Tweak the batch timeout in each catalog entry to trade off increased latency for higher throughput. Counterintuitively, note that increased batch sizes can actually decrease average case latency, as improved throughput can decrease queuing effects.

  • The peer-config option :onyx.messaging/allow-short-circuit? (Peer Config), should be set to true in production.

  • Profile your code locally using Java Flight Recorder / Mission Control. The easiest way to get started is to create a benchmark test, and the following JVM_OPTS -XX:+UnlockCommercialFeatures -XX:+FlightRecorder" -XX:+UnlockDiagnosticVMOptions -XX:StartFlightRecording=duration=1080s,filename=localrecording.jfr. This will save a file to "localrecording.jfr, which you can open in mission control (which can be started via the command jmc. You will need Oracle’s JVM to use this functionality.

  • Start roughly 1 virtual peer per core per machine. When using Aeron messaging, cores = virtual peers + subscribers is a good guideline. This recommendation is a good starting point, however may not hold true when some virtual peers are largely idle, or spend much of their time I/O blocked.

  • Use a custom compression scheme, rather than Nippy. You can configure custom compression/decompression functions via the peer configuration.

  • Increase the number of acker peers through the peer configuration as your cluster gets larger

  • Tune the number of Aeron subscriber threads, if serialization is a large proportion of work performed in tasks.

ZooKeeper

  • Put the ZooKeeper journal on its own physical volume

Zookeeper tends to not get a huge amount of traffic, so this probably won’t offer a huge performance boost. It’s helpful if you’re trying to make your processing latency as predictable as possible, though.

Messaging

Aeron

Ensure you disable the embedded media driver, and instead use an independent media driver (see Media Driver)

When testing performance with a single node using the Aeron messaging layer, connection short circuiting may cause very misleading results.

Please refer to the Aeron messaging section for general discussion of the Aeron messaging implementation and its characterstics.

Environment

In this chapter, we’ll discuss what you need to set up a develop and production environment.

Development Environment

Dependencies

  • Java 8+

  • Clojure 1.7+

Explanation

One of the primary design goals of Onyx is to make the development environment as close as possible to production - without making the developer run a lot of services locally. A development environment in Onyx merely needs Clojure 1.7+ to operate. A ZooKeeper server is spun up in memory via Curator, so you don’t need to install ZooKeeper locally if you don’t want to.

Production Environment

Dependencies

  • Java 8+

  • Clojure 1.7+

  • ZooKeeper 3.4.5+

Multi-node & Production Checklist

Congratulations! You’re going to production, or at least testing your Onyx jobs with a multi-node setup.

We strongly recommend you run through this checklist before you do so, as it will likely save you a lot of time.

  • Ensure your JVM is running with JVM opts -server Performance will be greatly decreased if you do not run Onyx via Java without at least -server JVM opts.

  • Disable embedded ZooKeeper: when onyx.api/start-env is called with a config where :zookeeper/server? true, it will start an embedded ZooKeeper. :zookeeper/server? should be set to false in production.

  • Setup production ZooKeeper: A full ZooKeeper ensemble should be used in lieu of the testing ZooKeeper embedded in Onyx.

  • Increase maximum ZK connections Onyx establishes a large number of ZooKeeper connections, in which case you will see an exception like the following: WARN org.apache.zookeeper.server.NIOServerCnxn: Too many connections from /127.0.0.1 - max is 10. Increase the number of connections in zoo.cfg, via maxClientCnxns. This should be set to a number moderately larger than the number of virtual peers that you will start.

  • Configure ZooKeeper address to point an ensemble: :zookeeper/address should be set in your peer-config e.g. :zookeeper/address "server1:2181,server2:2181,server3:2181".

  • Ensure all nodes are using the same :onyx/tenancy-id: :onyx/tenancy-id in the peer-config is used to denote which cluster a virtual peer should join. If all your nodes do not use the same :onyx/tenancy-id, they will not be a part of the same cluster and will not run the same jobs. Any jobs submitted a cluster must also use the same :onyx/tenancy-id to ensure that cluster runs the job.

  • Do not use core async tasks: switch all input or output tasks from core.async as it is a not a suitable medium for multi-node use and will result in many issues when used in this way. The Kafka plugin is one recommended alternative.

  • Set messaging bind address: the messaging layer must be bound to the network interface used by peers to communicate. To do so, set :onyx.messaging/bind-addr in peer-config to a string defining the interface’s IP. On AWS, this IP can easily be obtained via (slurp "http://169.254.169.254/latest/meta-data/local-ipv4").

  • Is your bind address external facing?: If your bind address is something other than the one accessible to your other peers (e.g. docker, without net=host), then you will need to define an external address to advertise. This can be set via :onyx.messaging/external-addr in peer-config.

  • Open UDP ports for Aeron: Aeron requires the port defined in :onyx.messaging/peer-port to be open for UDP traffic.

  • Setup an external Aeron Media Driver: If messaging performance is a factor, it is recommended that the Aeron Media Driver is run out of process. First, disable the embedded driver by setting :onyx.messaging.aeron/embedded-driver? false. An example out of process media driver is included in lib-onyx. This media driver can be started via lein run -m, or via an uberjar, each by referencing the correct namespace, which contains a main entry point. Ensure that the media driver is started with JVM opts -server.

  • Setup metrics: when in production, it is essential that you are able to monitor Onyx’s metrics. We recommend using prometheus with Onyx, and onyx-peer-http-query contains a prometheus endpoint. Alternatively, use any Java Agent that is able to scrape JMX metrics.

  • Investigate Checkpoint Garbage Collection: onyx.api/gc-checkpoints will allow space taken by old checkpoints to be reclaimed from S3 or ZooKeeper.

ZooKeeper

Environment Launch of In-Memory ZooKeeper

To launch an in-memory ZooKeeper instance, add :zookeeper/server? true to the environment options. Also, specify :zookeeper.server/port <my port> so that Curator knows what port to start running the server on.

If your deployment throws an exception and doesn’t shut down ZooKeeper, it will remain open. Firing up the environment again will cause a port collision, so be sure to restart your repl in that case.

Peer Connection to In-Memory ZooKeeper

Add :zookeeper/address "127.0.0.1:<my port>" to the peer options as usual. In-memory Zookeeper is completely opaque to the peer.

Example

Here’s an example of using ZooKeeper in-memory, with some non-ZooKeeper required parameters elided.

(def env-config
  {:zookeeper/address "127.0.0.1:2182"
   :zookeeper/server? true
   :zookeeper.server/port 2182
   :onyx/tenancy-id id})

(def peer-opts
  {:zookeeper/address "127.0.0.1:2182"
   :onyx/tenancy-id id})

Networking / Firewall

Messaging requires the UDP port to be open for port set :onyx.messaging/peer-port.

All peers require the ability to connect to the ZooKeeper instances over TCP.

Explanation

Running a ZooKeeper cluster is a requirement for a lot of fault tolerant systems. See this link for getting set up. I won’t go into detail since this is a particularly common set up. We recommend using Exhibitor to manage clustered ZooKeeper.

Example

Notice that all we’re doing is extending the address string to include more host:port pairs. This uses the standard ZooKeeper connection string, so you can use authentication here too if you need it.

(def peer-opts
  {...
   :zookeeper/address "10.132.8.150:2181,10.132.8.151:2181,10.132.8.152:2181"
   ...})

Information Model

The information model is now described in the Cheat Sheet. The information model is also described in a nested map.

Scheduling

Onyx offers fine-grained control of how many peers are allocated to particular jobs and tasks. This section outlines how to use the built-in schedulers.

Allocating Peers to Jobs and Tasks

In a masterless design, there is no single entity that assigns tasks to peers. Instead, peers need to contend for tasks to execute as jobs are submitted to Onyx. Conversely, as peers are added to the cluster, the peers must "shift" to distribute the workload across the cluster. Onyx ships out-of-the-box job and task allocation policies. End users can change the levels of fairness that each job gets with respect to cluster power. And remember, one virtual peer executes at most one task.

Job Schedulers

Each running Onyx instance is configured with exactly one job scheduler. The purpose of the job scheduler is to coordinate which jobs peers are allowed to volunteer to execute. There are a few different kinds of schedulers, listed below. To use each, configure the Peer Options map with key :onyx.peer/job-scheduler to the value specified below.

Greedy Job Scheduler

The Greedy job scheduler allocates all peers to each job in the order that it was submitted. For example, suppose you had 100 virtual peers and you submitted two jobs - job A and job B. With a Greedy scheduler, all 100 peers would be allocated to job A. If job A completes, all 100 peers will then execute tasks for job B. This probably isn’t desirable when you’re running streaming workflows, since they theoretically never end.

To use this scheduler, set key :onyx.peer/job-scheduler to :onyx.job-scheduler/greedy in the Peer Options.

Peer Addition

In the event that a peer joins the cluster while the Greedy job scheduler is running, that new peer will be allocated to the current job that is being run greedily.

Peer Removal

In the event that a peer leaves the cluster while the Greedy job scheduler is running, no peers will be shifted off of the job that is greedily running.

Job Addition

If a job is submitted while this scheduler is running, no peers will be allocated to this job. The only exception to this rule is if no jobs are currently running. In this case, all peers will be allocated to this job.

Job Removal

If a job is completed or otherwise canceled, all of the peers executed that task will move to the job that was submitted after this job.

Balanced Robin Job Scheduler

The Balanced job scheduler allocates peers in a rotating fashion to jobs that were submitted. For example, suppose that you had 100 virtual peers (virtual peer 1, virtual peer 2, …​ virtual peer 100) and you submitted two jobs - job A and job B. With a Balanced scheduler, both jobs will end up with 50 virtual peers allocated to each. This scheduler begins allocating by selecting the first job submitted.

To use this scheduler, set key :onyx.peer/job-scheduler to :onyx.job-scheduler/balanced in the Peer Options.

Peer Addition

In the event that a peer joins the cluster while the Balanced scheduler is running, that new peer will be allocated to the job that most evenly balances the cluster according to a the number of jobs divided by the number of peers. If there is a tie, the new peer is added to the earliest submitted job.

Peer Removal

In the event that a peer leaves the cluster while the Balanced scheduler is running, the peers across all jobs will be rebalanced to evenly distribute the workflow.

Job Addition

If a job is submitted while this scheduler is running, the entire cluster will be rebalanced. For example, if job A has all 100 peers executing its task, and job B is submitted, 50 peers will move from job A to job B.

Job Removal

If a job is completed or otherwise canceled while this scheduler is running, the entire cluster will be rebalanced. For example, if job A, B, and C had 20 peers executing each of its tasks (60 peers total), and job C finished, job A would gain 10 peers, and job B would gain 10 peers.

Percentage Job Scheduler

The Percentage job scheduler allows jobs to be submitted with a percentage value. The percentage value indicates what percentage of the cluster will be allocated to this job. The use case for this scheduler is for when you have a static number of jobs and a varying number of peers. For example, if you have 2 jobs - A and B, you’d give each of this percentage values - say 70% and 30%, respectively. If you had 100 virtual peers running, 70 would be allocated to A, and 30 to B. If you then added 100 more peers to the cluster, job A would be allocated 140 peers, and job B 60. This dynamically scaling is a big step forward over statically configuring slots, which is normal in ecosystems like Hadoop and Storm.

If there aren’t enough peers to satisfy the percentage values of all the jobs, this scheduler will allocate with priority to jobs with the highest percentage value. When percentage values are equal, the earliest submitted job will get priority. In the event that jobs are submitted, and the total percentage value exceeds 100%, the earliest submitted jobs that do not exceed 100% will receive peers. Jobs that go beyond it will not receive peers. For example, if you submitted jobs A, B, and C with 70%, 30%, and 20% respectively, jobs A and B would receive peers, and C will not be allocated any peers until either A or B completes.

If the total percentages of all submitted jobs doesn’t sum up to 100%, the job with the highest percentage value will receive the extra peers. When percentage values are equal, the earliest submitted job will get priority.

If the algorithm determines that any job should receive a number of peers that is less than the minimum number it needs to execute, that job receives no peers.

To use this scheduler, set key :onyx.peer/job-scheduler to :onyx.job-scheduler/percentage in the Peer Options.

Peer Addition

In the event that a peer joins the cluster while the Percentage job scheduler is running, the entire cluster will rebalance.

Peer Removal

In the event that a peer leaves the cluster while the Percentage job scheduler is running, the entire cluster will rebalance.

Job Addition

If a job is submitted while this scheduler is running, the entire cluster will be rebalanced.

Job Removal

If a job is completed or otherwise canceled while this scheduler is running, the entire cluster will be rebalanced.

Task Schedulers

Each Onyx job is configured with exactly one task scheduler. The task scheduler is specified at the time of calling submit-job. The purpose of the task scheduler is to control the order in which available peers are allocated to which tasks. There are a few different Task Scheduler implementations, listed below. To use each, call onyx.api/submit-job. The second argument of this function is a map. Supply a :task-scheduler key and map it to the value specified below.

Balanced Task Scheduler

The Balanced Scheduler takes a topological sort of the workflow for a specific job. As peers become available, this scheduler assigns tasks to peers in a rotating order. For example, if a workflow has a topological sort of tasks A, B, C, and D, this scheduler assigns each peer to tasks A, B, C, D, A, B, C, D, …​ and so on.

To use, set :task-scheduler in submit-job to :onyx.task-scheduler/balanced.

Peer Removal

If a peer fails, or is otherwise removed from the cluster, the Task scheduler defers to the Job scheduler to rebalance the cluster. If a new peer is added to this task as a result of a peer failing in another job, it is assigned the next task in the balanced sequence.

Max Peer Parameter

With the Balanced Task Scheduler, each entry in the catalog can specify a key of :onyx/max-peers with an integer value > 0. When this key is set, Onyx will never assign that task more than that number of peers. Balanced will simply skip the task for allocation when more peers are available, and continue assigning balanced to other tasks.

Percentage Task Scheduler

The Percentage Scheduler takes a set of tasks, all of which must be assigned a percentage value (:onyx/percentage) in the corresponding catalog entries. The percentage values must add up to 100 or less. Percent values may be integers between 1 and 99, inclusive. This schedule will allocate peers for this job in proportions to the specified tasks. As more or less peers join the cluster, allocations will automatically scale. For example, if a job has tasks A, B, and C with 70%, 20%, and 30% specified as their percentages, and there are 10 peers, task A receives 7 peers, B 2 peers, and C 1 peer.

This scheduler handles corner cases (fractions of peers) in the same way as the Percentage Job Scheduler. See that documentation for a full description.

To use, set :task-scheduler in submit-job to :onyx.task-scheduler/percentage.

Peer Removal

If a peer fails, or is otherwise removed from the cluster, the Task scheduler rebalances all the peers for even distribution.

Colocation Task Scheduler

The Colocation Schedule takes all of the tasks for a job and, if possible, assigns them to the peers on a single physical machine represented by the same peer group. If a job has 4 tasks and the cluster is one machine with 5 peers, 4 peers will become active. If that machine had 8 peers, all 8 would become active as this schedule operates in peer chunks that are divisible by the task size. If more machines are capable of executing the entire job, they will also be used.

This scheduler is useful for dramatically increasing performance of jobs where the latency is bound by the network of transmitting data across tasks. Using this scheduler with peer short circuiting will ensure that segments are never serialized and never cross the network between tasks (with the exception of grouping tasks). Onyx’s usual fault tolerancy mechanisms are still used to ensure that data is processed in the presence of machine failure.

To use, set :task-scheduler in submit-job to :onyx.task-scheduler/colcocated.

To use colocation, but to disable the scheduler’s affinity to always send segments to a local peer, set :onyx.task-scheduler.colocated/only-send-local? to false in the peer config. This is desirable when optimal performance depends on the uniformity of tasks being evenly assigned to machines in your cluster, but strictly local execution is not helpful for performance.

Peer Addition

If a peer is added to the cluster and its machine is capable of executing all the tasks for this job, the entire machine will be used - provided that it falls into the pool of peers elligible to execute this job, per the job scheduler’s perogative.

Peer Removal

If a peer is removed, all the peers associated with this job’s tasks for this chunk of peers will stop executing their tasks.

Tags

It’s often the case that a set of machines in your cluster are privileged in some way. Perhaps they are running special hardware, or they live in a specific data center, or they have a license to use a proprietary database. Sometimes, you’ll have Onyx jobs that require tasks to run on a predetermined set of machines. Tags are a feature that let peers denote "capabilities". Tasks may declare which tags peers must have in order to be selected to execute them.

Peers

To declare a peer as having special capabilities, use a vector of keywords in the Peer Configuration under key :onyx.peer/tags. For example, if you wanted to declare that a peer has a license for its JVM to communicate with Datomic, you might add this to your Peer Configuation:

{...
 :onyx/tenancy-id "my-cluster"
 :onyx.peer/tags [:datomic]
 ...
}

You can specify multiple tags. The default is no tags ([]), in which case this peer can execute any tasks that do not require tags.

Tasks

Now that we have a means for expressing which peers can do which kinds of things, we’ll need a way to express which tasks require which capabilities. We do this in the catalog. Any task can use the key :onyx/required-tags with a vector of keywords as a value. Any peer that executes this task is garunteed to have :onyx/required-tags as a subset of its :onyx.peer/tags.

For example, to declare that task :read-datoms must be executed by a peer that can talk to Datomic, you might write:

[{:onyx/name :read-datoms
  :onyx/plugin :onyx.plugin.datomic/read-datoms
  :onyx/type :input
  :onyx/medium :datomic
  :onyx/required-tags [:datomic] ;; <- Add this!
  :datomic/uri db-uri
  ...
  :onyx/batch-size batch-size
  :onyx/doc "Reads a sequence of datoms from the d/datoms API"}
 ...
]

Event Subscription

Onyx’s log-based design provides open-ended access to react to all coordination events. This section describes how to tap into these notifications.

Explanation

Onyx uses an internal log to totally order all coordination events across nodes in the cluster. This log is maintained as a directory of sequentially ordered znodes in ZooKeeper. It’s often of interest to watch the events as they are written to the log. For instance, you may want to know when a particular task is completed, or when a peer joins or leaves the cluster. You can use the log subscriber to do just that.

Subscribing to the Log

The following is a complete example to pretty print all events as they are written to the log. You need to provide the ZooKeeper address, Onyx ID, and shared job scheduler in the peer config. The subscriber will automatically track recover from sequentially reading errors in the case that a garbage collection is triggered, deleting log entries in its path.

(def peer-config
  {:zookeeper/address "127.0.0.1:2181"
   :onyx/tenancy-id onyx-id
   :onyx.peer/job-scheduler :onyx.job-scheduler/round-robin})

(def ch (chan 100))

(def subscription (onyx.api/subscribe-to-log peer-config ch))

(def log (:log (:env subscription)))

;; Loops forever
(loop [replica (:replica subscription)]
  (let [entry (<!! ch)
        new-replica (onyx.extensions/apply-log-entry entry replica)]
    (clojure.pprint/pprint new-replica)
    (recur new-replica)))

(onyx.api/shutdown-env (:env subscription))

Some example output from a test, printing the log position, log entry content, and the replica as-of that log entry:

====
Log Entry #0
Entry is {:message-id 0, :fn :prepare-join-cluster, :args {:joiner #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"}}
Replica is:
{:peer-state {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #1
Entry is {:message-id 1, :fn :prepare-join-cluster, :args {:joiner #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"}}
Replica is:
{:prepared
 {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"},
 :peer-state {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #2
Entry is {:message-id 2, :fn :notify-join-cluster, :args {:observer #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf", :subject #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"}}
Replica is:
{:accepted
 {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"},
 :prepared {},
 :peer-state {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #3
Entry is {:message-id 3, :fn :accept-join-cluster, :args {:observer #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf", :subject #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577", :accepted-observer #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577", :accepted-joiner #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"}}
Replica is:
{:pairs
 {#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"},
 :accepted {},
 :prepared {},
 :peer-state
 {#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #4
Entry is {:message-id 4, :fn :prepare-join-cluster, :args {:joiner #uuid "010a1688-47ff-4055-8da5-1f02247351e1"}}
Replica is:
{:pairs
 {#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"},
 :accepted {},
 :prepared
 {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :peer-state
 {#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #5
Entry is {:message-id 5, :fn :notify-join-cluster, :args {:observer #uuid "010a1688-47ff-4055-8da5-1f02247351e1", :subject #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"}}
Replica is:
{:pairs
 {#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"},
 :accepted
 {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :prepared {},
 :peer-state
 {#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #6
Entry is {:message-id 6, :fn :accept-join-cluster, :args {:observer #uuid "010a1688-47ff-4055-8da5-1f02247351e1", :subject #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf", :accepted-observer #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577", :accepted-joiner #uuid "010a1688-47ff-4055-8da5-1f02247351e1"}}
Replica is:
{:pairs
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :accepted {},
 :prepared {},
 :peer-state
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #7
Entry is {:message-id 7, :fn :prepare-join-cluster, :args {:joiner #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"}}
Replica is:
{:pairs
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :accepted {},
 :prepared
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"},
 :peer-state
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #8
Entry is {:message-id 8, :fn :notify-join-cluster, :args {:observer #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c", :subject #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"}}
Replica is:
{:pairs
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :accepted
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"},
 :prepared {},
 :peer-state
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #9
Entry is {:message-id 9, :fn :accept-join-cluster, :args {:observer #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c", :subject #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf", :accepted-observer #uuid "010a1688-47ff-4055-8da5-1f02247351e1", :accepted-joiner #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"}}
Replica is:
{:pairs
 {#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c",
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :accepted {},
 :prepared {},
 :peer-state
 {#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c" :idle,
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #10
Entry is {:message-id 10, :fn :prepare-join-cluster, :args {:joiner #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"}}
Replica is:
{:pairs
 {#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c",
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :accepted {},
 :prepared
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"},
 :peer-state
 {#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c" :idle,
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #11
Entry is {:message-id 11, :fn :notify-join-cluster, :args {:observer #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4", :subject #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"}}
Replica is:
{:pairs
 {#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c",
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :accepted
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"},
 :prepared {},
 :peer-state
 {#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c" :idle,
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #12
Entry is {:message-id 12, :fn :accept-join-cluster, :args {:observer #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4", :subject #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c", :accepted-observer #uuid "010a1688-47ff-4055-8da5-1f02247351e1", :accepted-joiner #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"}}
Replica is:
{:pairs
 {#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c",
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4",
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :accepted {},
 :prepared {},
 :peer-state
 {#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4" :idle,
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c" :idle,
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
  #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"
],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #13
Entry is {:message-id 13, :fn :submit-job, :args {:id #uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7", :tasks [#uuid "ce13205e-937b-4af6-9aa9-d5149b31fb2c" #uuid "948f8595-3a0a-4318-b128-91c1d22c0158" #uuid "fb86b977-d668-4c98-abaa-80ee0d29663a"], :task-scheduler :onyx.task-scheduler/round-robin, :saturation Infinity, :task-saturation {#uuid "ce13205e-937b-4af6-9aa9-d5149b31fb2c" Infinity, #uuid "948f8595-3a0a-4318-b128-91c1d22c0158" Infinity, #uuid "fb86b977-d668-4c98-abaa-80ee0d29663a" Infinity}}}
Replica is:
{:job-scheduler :onyx.job-scheduler/greedy,
 :saturation {#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7" Infinity},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
  #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"],
 :accepted {},
 :jobs [#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7"],
 :tasks
 {#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7"
  [#uuid "ce13205e-937b-4af6-9aa9-d5149b31fb2c"
   #uuid "948f8595-3a0a-4318-b128-91c1d22c0158"
   #uuid "fb86b977-d668-4c98-abaa-80ee0d29663a"]},
 :pairs
 {#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c",
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4",
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"

  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :allocations {#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7" {}},
 :prepared {},
 :peer-state
 {#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4" :idle,
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c" :idle,
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :task-schedulers
 {#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7"
  :onyx.task-scheduler/round-robin},
 :task-saturation
 {#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7"
  {#uuid "ce13205e-937b-4af6-9aa9-d5149b31fb2c" Infinity,
   #uuid "948f8595-3a0a-4318-b128-91c1d22c0158" Infinity,
   #uuid "fb86b977-d668-4c98-abaa-80ee0d29663a" Infinity}}}
====
...

Plugins

Plugins serve as an abstract to compose mechanisms for getting data in and out of Onyx. See the README.md of the project for a list of official Onyx plugins, or keep reading to roll your own.

Interfaces

In order to implement a plugin, one or more protocols need to be implemented from the plugin interfaces.

Input Plugin

Required interface implementations:

  • Plugin (stop/start)

  • Input

  • Checkpointed

  • BarrierSynchronization

Output Plugin

Required interface implementations:

  • Plugin (stop/start)

  • Output

  • Checkpointed

  • BarrierSynchronization

Templates

To help move past the boilerplate of creating new plugins, use Leiningen with onyx-plugin to generate a template.

onyx-core-async

Onyx plugin providing read and write facilities for Clojure core.async.

Installation

This plugin is included with Onyx. You do not need to add it as a separate dependency.

In your peer boot-up namespace:

(:require [onyx.plugin.core-async])

Functions

read-from-chan

Catalog entry:

{:onyx/name :in
 :onyx/plugin :onyx.plugin.core-async/input
 :onyx/type :input
 :onyx/medium :core.async
 :onyx/batch-size batch-size
 :onyx/max-peers 1
 :onyx/doc "Reads segments from a core.async channel"}

Lifecycle entries:

[{:lifecycle/task :your-task-name
  :lifecycle/calls :my.ns/in-calls}
 {:lifecycle/task :your-task-name
  :lifecycle/calls :onyx.plugin.core-async/reader-calls}]

There’s a little extra baggage with core.async because you need a reference to the channel. Make sure that my.ns/in-calls is a map that references a function to inject the channel in:

(def in-chan (chan capacity))
(def in-buffer (atom {}))

(defn inject-in-ch [event lifecycle]
  {:core.async/buffer in-buffer
   :core.async/chan in-chan})

(def in-calls
  {:lifecycle/before-task-start inject-in-ch})

write-to-chan

Catalog entry:

{:onyx/name :out
 :onyx/plugin :onyx.plugin.core-async/output
 :onyx/type :output
 :onyx/medium :core.async
 :onyx/batch-size batch-size
 :onyx/max-peers 1
 :onyx/doc "Writes segments to a core.async channel"}

Lifecycle entries:

[{:lifecycle/task :your-task-name
  :lifecycle/calls :my.ns/out-calls}
 {:lifecycle/task :your-task-name
  :lifecycle/calls :onyx.plugin.core-async/writer-calls}]

Again, as with read-from-chan, there’s a little extra to do since core.async has some exceptional behavior compared to other plugins:

(def out-chan (chan capacity))

(defn inject-out-ch [event lifecycle]
  {:core.async/chan out-chan})

(def out-calls
  {:lifecycle/before-task-start inject-out-ch})

Utility Functions

take-segments!

This additional function is provided as a utility for removing segments from a channel until nothing is read for a provided timeout (in milliseconds).

onyx-seq

Onyx plugin for reading from a seq. The seq will be read from in a way that is approriate for use with lazy-seqs. Therefore, this plugin can be useful for use with datomic.api/datoms calls, slow lazy calculations, line-seq / buffered reading, etc.

Installation

This plugin is included with Onyx. You do not need to add it as a separate dependency.

In your peer boot-up namespace:

(:require [onyx.plugin.seq])

Functions

sample-entry

Catalog entry:

{:onyx/name :in
 :onyx/plugin :onyx.plugin.seq/input
 :onyx/type :input
 :onyx/medium :seq
 :seq/checkpoint? true
 :onyx/batch-size batch-size
 :onyx/max-peers 1
 :onyx/doc "Reads segments from seq"}

Lifecycle entry:

[{:lifecycle/task :in
  :lifecycle/calls :onyx.plugin.seq/reader-calls}]

Ensure that the elements of your seq are maps. If they are not, it may help to bundle them into maps by transforming the elements before returning it from the lifecycle.

e.g. (map (fn [a] {:val a}) ["A" "B" "C"])

Checkpointing

The plugin will checkpoint the state of the read results. In the case of a virtual peer crash, the new virtual peer will drop from the seq until it has reached the first non fully acked segment. In order for this process to work, the seq that is read from be reproducible on restart. If it is not, please disable checkpointing via :seq/checkpoint?.

Example Use - Buffered Line Reader

(defn inject-in-reader [event lifecycle]
  (let [rdr (FileReader. (:buffered-reader/filename lifecycle))]
    {:seq/rdr rdr
     :seq/seq (line-seq (BufferedReader. rdr))}))

(defn close-reader [event lifecycle]
  (.close (:seq/rdr event)))

(def in-calls
  {:lifecycle/before-task-start inject-in-reader
   :lifecycle/after-task-stop close-reader})

;; lifecycles

(def lifecycles
  [{:lifecycle/task :in
    :buffered-reader/filename "test-resources/lines.txt"
    :lifecycle/calls ::in-calls}
   {:lifecycle/task :in
    :lifecycle/calls :onyx.plugin.seq/reader-calls}])

Logging

This chapter details how to inspect and modify the logs that Onyx produces.

Timbre

Onyx uses Timbre for logging.

By default, all Onyx output is logged to a file called onyx.log in the same directory as where the peer or coordinator jar is executing. The logging configuration can be overridden completely, see below.

Onyx’s default logging configuration writes all WARN and FATAL messages to standard out, also.

Overriding the log file name and path

In order to override the log file location add :onyx.log/file to both the environment config sent into start-env as well as the peer config sent into start-peer-group.

Both relative and absolute paths are supported by Timbre.

Overriding the Timbre log config

Similarly, to override the full Timbre log config map, construct the Timbre configuration and add it to both the environment and peer config maps under the :onyx.log/config key. Note that the onyx.log/config map will be merged with the existing Timbre configuration rather than replacing it completely. In practice this means that extra configuration must be sent in to, for example, disable appenders that are enabled by default. See the examples below.

Examples

The following example simply changes the output file.

(let [log-config {:onyx.log/file "/var/log/onyx.log"}
      peer-config (merge my-peer-config log-config)
      env-config (merge my-env-config log-config)]
  (onyx.api/start-env env-config)
  (onyx.api/start-peer-group peer-config)

  ;; ...
  )

This example uses Timbre to redirect Onyx logs into the regular Java logging system using the log-config library.

(require '[com.palletops.log-config.timbre.tools-logging :as tl])
(let [log-config {:onyx.log/config {:appenders
                                    {:spit {:enabled? false}
                                     :standard-out {:enabled? false}
                                     :rotor {:enabled? false}
                                     :jl (tl/make-tools-logging-appender
                                          {:enabled? true
                                           :fmt-output-opts {:nofonts? true}})}
                                    :min-level :trace}}
      peer-config (merge my-peer-config log-config)
      env-config (merge my-env-config log-config)]
  (onyx/start-env env-config)
  (onyx/start-peer-group peer-config)

  ;; ...
  )

If you already have timbre logging configured somewhere in your code base, you can specify :onyx.log/config {} to ensure your settings remain unchanged. In effect, this simply merges in the empty map into whatever settings you may have already specified for logging.

(let [log-config {:onyx.log/config {}}
      peer-config (merge my-peer-config log-config)
      env-config (merge my-env-config log-config)]
  (onyx.api/start-env env-config)
  (onyx.api/start-peer-group peer-config)

  ;; ...
  )

See the Timbre example configuration for more information on valid values for the :onyx.log/config map.

Examples

Examples Project

See the Examples Project for a set of self-contained demonstrations of specific Onyx functionality.

Frequently Asked Questions

My job’s tasks never start

When your job’s tasks don’t start, you won’t see any messages in onyx.log on the peer that read something like:

Job fb2f2b80-2c5a-41b6-93c8-df35bffe6915 {:job-name :click-stream} - Task da5795a6-bd4c-41a1-843d-dda84cf5c615 :inc - Peer 2b35433a-b935-43b7-881b-4f4ec16672cc - Warming up task lifecycle {:id =uuid "da5795a6-bd4c-41a1-843d-dda84cf5c615", :name :inc, :egress-ids {:out =uuid "2c210c3e-6411-45fb-85a7-6cc733118215"}}

Instead, the log would show peers starting, but not doing anything. This looks like something:

Starting Virtual Peer 111dcc44-7f53-41a4-8548-047803e8d441

Resolutions

Do you have enough virtual peers provisioned?

When a job is submitted to Onyx, it will only begin when there are enough virtual peers to sustain its execution. How many are enough? That depends. One virtual peer may work on at most one task at any given time. By default, each task in your workflow requires one available virtual peer to run it. If a grouping task is used, or if you set :onyx/min-peers on the task’s catalog entry, the number may be greater than 1. You can statically determine how many peers a job needs for execution with this simple helper function available in Onyx core.

As an example, if you submit a job with 10 tasks in its workflow, you need at least 10 available virtual peers for your job to begin running. If you only have 8, nothing will happen. If you have more than 10 (assuming that 10 is truly the minimum that you need), Onyx may attempt to dedicate more resources to your job beyond 10 peers, depending on which scheduler you used You can control the number of virtual peers that you’re running by modifying the parameters to onyx.api/start-peers.

We don’t emit a warning that your cluster is underprovisioned in terms of number of peers because its masterless design makes this difficult to asynchronously detect. This has been discussed in an issue. Onyx is designed to handle under and over provisioning as part of its normal flow, so this is not considered an error condition.

See the Scheduling chapter of the User Guide for a full explanation.

Are you using the Greedy job scheduler?

You’ve hit this case if you have more than one job running concurrently. When you start your peers, you specified a job scheduler through the configuration key :onyx.peer/job-scheduler. If you specified :onyx.job-scheduler/greedy, you’ve asked that Onyx dedicate all cluster resources to the first submitted job until that job is completed. Then the job is completed, all resources will be assigned to the next job available, and so on. If you want more than one job running at a time, consider using the Balanced (onyx.job-scheduler/balanced) or Percentage (onyx.job-scheduler/percentage) job schedulers. See the Scheduling chapter of this User Guide for a full explanation.

Peers repeatedly echo that they’re warming up

If you see something like the following repeatedly being written to onyx.log, you’re in the right place:

Job fb2f2b80-2c5a-41b6-93c8-df35bffe6915 {:job-name :click-stream} - Task 2c210c3e-6411-45fb-85a7-6cc733118215 :out - Peer 3b2c6667-8f41-47a9-ba6b-f97c81ade828 - Peer chose not to start the task yet. Backing off and retrying...

What’s happening here is that your job’s tasks have started execution. Peers have been assigned tasks, and are beginning to get things in order to perform work on them. Before a peer can truly begin running a task, it must do three things:

  1. Call all lifecycle hooks registered under lifecycle/start-task? sequentially, accruing their results

  2. Call the constructor for the input or output plugin, if this is an input or output task.

  3. Emit a signal back to ZooKeeper indicating that this peer has opened up all of its sockets to receiving incoming communication from other peers.

The results from the first step are a sequence of boolean values designating whether the peer is allowed to start this task. If any of the start-task? lifecycle calls return false, the peer sleeps for a short period to back off, then performs step 1 again. This process continues until all lifecycle calls return true. Read the Lifecycles chapter of this User Guide to learn why it’s useful to be repeatedly try and back-off from task launching.

The second step will only occur after step 1 successfully completes - that is, all lifecycle functions return true. The reason you see "Backing off and retrying…​" in the logs is because step 1 is being repeated again.

Resolutions

Do all your :lifecycle/start-task? functions eventually return

true?

If any of your lifecycle hooks for :lifecycle/start-task? return false or nil, you’ll want to change them to eventually return true based upon some condition.

Are the connections that your plugin opens valid?

As per step 2, a plugin’s constructor will be called to open any relevant stateful connections, such as a database or socket connection. Many connection calls, such as a JDBC SQL, will block for prolonged periods of time before failing, unless otherwise configured. If the task that appears to be stuck is an input or output task, this is likely the cause. You may want to reconfigure your initial connections to fail faster to make it more obvious as to what’s happening.

None of my messages are being processed

If you’re onyx.log shows messages that read as follows, your job’s tasks have successfully started:

Job fb2f2b80-2c5a-41b6-93c8-df35bffe6915 {:job-name :click-stream} - Task da5795a6-bd4c-41a1-843d-dda84cf5c615 :inc - Peer 2b35433a-b935-43b7-881b-4f4ec16672cc - Enough peers are active, starting the task

If you suspect that messages are not being processed, it heavily depends on the input plugin that you’re using.

Resolutions

Try using :onyx/fn to log all incoming messages

One thing that you can do for extra visibility is to log all incoming messages from input tasks. This is inadvisable for production, but can be useful for temporary debugging. You can specify an :onyx/fn transformation function to any task, including inputs and outputs. It can be useful to specify a debug function on your input tasks to see which messages are entering the system. Be sure that this function returns the original segment! For example, you can define a function:

(defn spy [segment]
  (println "Read from input:" segment)
  segment)

Then add :onyx/fn ::spy to your input catalog entries.

Has your job been killed?

Unless otherwise configured by your Lifecycles, if any user-level code throws an exception, the job will be killed and is no longer elligible for execution. Check onyx.log on all your peers to ensure that no exceptions were thrown. If this were the case, you’d see messages lower in the log reading:

Job fb2f2b80-2c5a-41b6-93c8-df35bffe6915 {:job-name :click-stream} - Task 1e83e005-3e2d-4307-907b-a3d66e3aa293 :in - Peer 111dcc44-7f53-41a4-8548-047803e8d441 - Stopping task lifecycle
Are you using the Kafka input plugin?

If you’re using the Kafka input plugin, make sure that you’re reading from a reasonable starting offset of the topic. If you’ve set :kafka/force-reset? to true in the catalog entry, and you’ve also set :kafka/offset-reset to :largest, you’ve instructed Onyx to begin reading messages from the end of the topic. Until you place more messages into the topic, Onyx will sit idle waiting for more input. The starting offset for each input task using Kafka is echoed out in onyx.log.

The same messages are being replayed multiple times

Message replay happens when a message enters Onyx from an input source, gets processed, and is seen again at a later point in time. Onyx replays messages for fault tolerance when it suspects that failure of some sort has occurred. You can read about how message replay is implemented, and why it is exists, in the Architecture chapter of this User Guide.

There are many reasons why a message may need to be replayed (every possible failure scenario), so we will limit our discussion to controlling replay frequency. See the performance tuning sections of this document for more context about what value is appropriate to set for the replay frequency.

Resolutions

Is your :onyx/pending-timeout too low?

Messages are replayed from the input source if they do not complete their route through the cluster within a particular period of time. This period is controlled by the :onyx/pending-timeout parameter to the catalog entry, and it’s default is 60 seconds. You can read about its specifics in the Cheatsheet. You should set this value high enough such that any segment taking longer than this value to complete is highly likely to have encountered a failure scenario.

My program starts running, but then it stalls

Programs that begin healthy by processing messages and then stall are out typically indicative of user-level code problems. We outline a few common cases here.

Resolutions

Does onyx.log have any exceptions in it?

Most exceptions will kill the job in question. If you are simply monitoring progress by reading from an output data source through Onyx, you should check all of the peer onyx.log files for exceptions that may have killed the job.

Are any user-level functions blocking?

Any implementations of :onyx/fn that are blocking will halt progress of all other segments that are directly lined up behind it. Ensure that user level functions finish up in a timely manner.

Are messages being replayed?

To get started, see the full section on how and why messages are being replayed. In short, messages will be replayed in 60 seconds if they are not completed. You may be experiencing initial success, followed by a runtime error that is causing temporarily lost segments before replay.

Are you using a core.async output plugin?

If you’re using a core.async output plugin writing to a channel that will block writes when the buffer is full, you have run enough messages to put onto the channel such that core.async writes are now blocking, and hence stalling Onyx.

Are your peer hosts and ports advertised correctly?

Ensure that the host and port that the peer advertises to the rest of the cluster for incoming connections is correct. If it is incorrect, only tasks that are colocated on the same machine will have a chance of working. Remember that Onyx uses UDP as its port, so make sure that your security settings are allowing traffic to run through that protocol.

The host is configured via the :onyx.messaging/bind-addr key, and the port is configured via the :onyx.messaging/peer-port key.

Peer fails to start, and throws

java.io.IOException: No space left on device

This exception commonly occurs when running Onyx inside of a Docker container. Aeron requires more shared memory than the container allocates by default. You can solve this problem by starting your container with a larger amount of shared memory by specifying --shm-size on Docker >= 1.10.

Aeron Mediadriver crashes the JVM with SIGBUS

This exception can occur when Aeron does not have enough shared memory. Increase the amount of shared memory that is set as described above.

Peer fails to start, and throws

java.lang.IllegalStateException: aeron cnc file version not understood

This exception occurs when Aeron’s version is upgraded or downgraded between incompatible versions. The exception will also provide a path on the OS to some Aeron files. Shutdown the peer, delete that directory, then restart the peer.

Peer fails to start, and throws

Failed to connect to the Media Driver - is it currently running?

This message is thrown when the peer tries to start, but can’t engage Aeron in its local environment. Aeron can be run in embedded mode by switching :onyx.messaging.aeron/embedded-driver? to true, or by running it out of process on the peer machine, which is the recommended production setting. If you’re running it out of process, ensure that it didn’t go down when you encounter this message. You should run Aeron through a process monitoring tool such as monit when running it out of process.

Peer fails to start, and throws

uk.co.real_logic.aeron.driver.exceptions.ActiveDriverException: active driver detected

You have encountered the following exception:

uk.co.real_logic.aeron.driver.exceptions.ActiveDriverException: active driver detected
  clojure.lang.ExceptionInfo: Error in component :messaging-group in system onyx.system.OnyxPeerGroup calling ='com.stuartsierra.component/start

This is because you have started your peer-group twice without shutting it down. Alternatively, you may be using :onyx.messaging.aeron/embedded-driver? true in your peer-group and starting a media driver externally. Only one media driver can be started at a time.

Application fails to build uberjar, throw

'java.lang.unsupporteclassversionerror: uk.co.real_logic/aeron/Aeron$context unsupported major.minor version 52.0'

You have encountered the following exception:

java.lang.unsupporteclassversionerror: uk.co.real_logic/aeron/Aeron$context unsupported major.minor version 52.0

This is because you are trying to build/run an Onyx app with a JRE version lower than 1.8. Onyx supports Java 1.8 only.

My program begins running, but throws

No implementation of method: :read-char of protocol: ='clojure.tools.reader.reader-types/Reader found for class

You’ll encounter this exception when your :onyx/fn returns something that is not EDN and Nippy serializable, which is required to send it over the network. Ensure that return values from :onyx/fn return either a map, or a vector of maps. All values within must be EDN serializable.

What does Onyx use internally for compression by default?

Unless otherwise overridden in the Peer Pipeline API, Onyx will use Nippy. This can be override by setting the peer configuration with :onyx.messaging/compress-fn and :onyx.messaging/decompress-fn. See the Information Model documentation for more information.

How can I filter segments from being output from my tasks?

Use Flow Conditions or return an empty vector from your :onyx/fn.

Can I return more than one segment from a function?

Return a vector of maps from :onyx/fn instead of a map. All maps at the top level of the vector will be unrolled and pushed downstream.

Should I be worried about user-level KeeperException in ZooKeeper

logs?

You should monitor these, however KeeperErrorCode = NodeExists are probably fine:

2015-11-05 15:12:51,332 [myid:] - INFO  [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@645] - Got user-level KeeperException when processing sessionid:0x150d67d0cd10003 type:create cxid:0xa zxid:0x50 txntype:-1 reqpath:n/a Error Path:/onyx/0e14715d-51b9-4e2b-af68-d5292f276afc/windows Error:KeeperErrorCode = NodeExists for /onyx/0e14715d-51b9-4e2b-af68-d5292f276afc/windows

This is a peer just trying to recreate a ZooKeeper path that was already created by another peer, and it can be safely ignored.

How should I benchmark on a single machine?

Definitely turn off messaging short circuiting, as messaging short circuiting will improve performance in a way that is unrealistic for multi-node use. Remember to turn messaging short circuiting back on for production use, as it does improve performance overall.