What does Onyx offer?

In this chapter, I’ll enumerate and explain the reasons why we built Onyx.

An Information Model

Information models are often superior to APIs, and almost always better than DSLs. The hyper-flexibility of a data structure literal allows Onyx workflows and catalogs to be constructed at a distance, meaning on another machine, in a different language, by another program, etc.

The information model for an Onyx workflow has the distinct advantage that it’s possible to compile other workflow representations (perhaps a datalog or SQL query) into the workflow that Onyx understands. The Information Model chapter describes a target for data structure compilation.

Temporal Decoupling

To the extent that Onyx places data at the highest importance, very few Onyx constructs actually need to be generated at the same time as program deployment or peer registration. Programs can create workflows, drop them to a database, and pull them out at a later time without any problems.

Elimination of Macros

Macros are a tremendously powerful tool, but are often inappropriate for end-user consumption of an API. Onyx goes beyond Storm’s defbolt and defspout by making vanilla Clojure functions shine. These functions need no context to execute and do not require any dynamic bindings. They receive all information that they need via parameters, which are injected by Onyx’s task lifecycles.

Plain Clojure Functions

To the same point above, we want plain Clojure functions to be the building blocks for application logic. Onyx’s functions can be tested directly without any special test runner.

Testing Without Mocking

In general, your design is in trouble when you’ve reached for with-redefs or something along those lines to mock functions. Onyx places a high importance around programming against interfaces, and even more-so around putting space in-between small components with channels. Onyx programs can be tested in development mode, and moved to production mode with only a small configuration file change. If you’d like to change your input or output plugins, all you need to do is re-associate the catalog entry with something like an in-memory plugin. No interface mocking code required.

Easy Parameterization of Workflows

It’s particularly telling that many compute frameworks don’t offer an easy way to parameterize workflows. Onyx puts space between the caller and the function definition. Parameterize tasks inside the catalog, and update the catalog entry at will. Additionally, Onyx allows peers to spin up their own parameters at boot-up time.

Transparent Code Reuse for Batch and Streaming

Onyx uses the notion of a sentinel value to transparently switch between streaming and batching modes. This makes it really easy to be able to reuse the same code for both batch and streaming computations.

Aspect Orientation

Clojure functions again serve as a huge win. Dire is a library that supports aspects, meaning you can keep your application logic airtight away from logging, preconditions, and error handling.

AOT Nothing

Onyx AOT’s absolutely nothing on your behalf. When you’re ready to stand your jar up, simply uberjar and start executing on the target machine. Hadoop and Storm cause dependency hell (In Storm’s case, you’re restricted to a particular version of Clojure because you’re locked in by the Executor) by providing their own dependencies on top of yours. Onyx won’t mess with your dependencies.

You can, however, AOT Onyx yourself to speed up compilation times.

Concepts

We’ll take a quick overview of some terms you’ll see in the rest of this user guide.

Terminology

Segment

A segment is the unit of data in Onyx, and it’s represented by a Clojure map. Segments represent the data flowing through the cluster. Segments are the only shape of data that Onyx allows you to emit between functions.

Task

A task is the smallest unit of work in Onyx. It represents an activity of either input, processing, or output.

Workflow

A workflow is the structural specification of an Onyx program. Its purpose is to articulate the paths that data flows through the cluster at runtime. It is specified via a directed, acyclic graph.

The workflow representation is a Clojure vector of vectors. Each inner vector contains exactly two elements, which are keywords. The keywords represent nodes in the graph, and the vector represents a directed edge from the first node to the second.

;;;    in
;;;    |
;;; increment
;;;    |
;;;  output
[[:in :increment] [:increment :out]]
;;;            input
;;;             /\
;;; processing-1 processing-2
;;;     |             |
;;;  output-1      output-2

[[:input :processing-1]
 [:input :processing-2]
 [:processing-1 :output-1]
 [:processing-2 :output-2]]
;;;            input
;;;             /\
;;; processing-1 processing-2
;;;         \      /
;;;          output

[[:input :processing-1]
 [:input :processing-2]
 [:processing-1 :output]
 [:processing-2 :output]]
Tip
Example projects: flat-workflow, multi-output-workflow

Catalog

All inputs, outputs, and functions in a workflow must be described via a catalog. A catalog is a vector of maps, strikingly similar to Datomic’s schema. Configuration and docstrings are described in the catalog.

Example:

[{:onyx/name :in
  :onyx/plugin :onyx.plugin.core-async/input
  :onyx/type :input
  :onyx/medium :core.async
  :onyx/batch-size batch-size
  :onyx/max-peers 1
  :onyx/doc "Reads segments from a core.async channel"}

 {:onyx/name :inc
  :onyx/fn :onyx.peer.min-peers-test/my-inc
  :onyx/type :function
  :onyx/batch-size batch-size}

 {:onyx/name :out
  :onyx/plugin :onyx.plugin.core-async/output
  :onyx/type :output
  :onyx/medium :core.async
  :onyx/batch-size batch-size
  :onyx/max-peers 1
  :onyx/doc "Writes segments to a core.async channel"}]

Flow Conditions

In contrast to a workflow, flow conditions specify on a segment-by-segment basis which direction data should flow determined by predicate functions. This is helpful for conditionally processing a segment based off of its content.

Example:

[{:flow/from :input-stream
  :flow/to [:process-adults]
  :flow/predicate :my.ns/adult?
  :flow/doc "Emits segment if this segment is an adult."}

Function

A function is a construct that receives segments and emits segments for further processing. It literally is a Clojure function.

Lifecycle

A lifecycle is a construct that describes the lifetime of a task. There is an entire chapter devoted to lifecycles, but to be brief, a lifecycle allows you to hook in and execute arbitrary code at critical points during a task. A lifecycle carries a context map that you can merge results back into for use later.

Windows

Windows are a construct that partitions a possible unbounded sequence of data into finite pieces, allowing aggregations to be specified. This lets you treat an infinite sequence of data as if it were finite over a given period of time.

Plugin

A plugin is a means for hooking into data sources to extract data as input and produce data as output. Onyx comes with a few plugins, but you can craft your own, too.

Sentinel

A sentinel is a value that can be pushed into Onyx to signal the end of a stream of data. This effectively lets Onyx switch between streaming and batching mode. The sentinel in Onyx is represented by the Clojure keyword :done.

Peer

A Peer is a node in the cluster responsible for processing data. A single "peer" refers to a physical machine, though we often use the terms peer and virtual peer interchangeably when the difference doesn’t matter.

Virtual Peer

A Virtual Peer refers to a single peer process running on a single physical machine. A single Virtual Peer executes at most one task at a time.

Job

A job is the collection of a workflow, catalog, flow conditions, lifecycles, and execution parameters. A job is most coarse unit of work, and every task is associated with exactly one job - hence a peer can only be working at most one job at any given time.

Architecture & Low Level Design

This chapter outlines how Onyx works on the inside to meet the required properties of a distributed data processing system. This is not a formal proof nor an iron-clad specification for other implementations of Onyx. I will do my best to be transparent about how everything is working under the hood - good and bad. If something doesn’t make sense, keep moving. There are inevitable forward references.

High Level Components

Peer

A Peer is a node in the cluster responsible for processing data. A peer generally refers to a physical machine as its typical to only run one peer per machine.

Virtual Peer

A Virtual Peer refers to a single concurent worker running on a single physical machine. Each virtual peer spawns a small number threads since it uses asynchronous messaging. All virtual peers are equal, whether they are on the same physical machine or not. Virtual peers communicate segments directly to one another, and coordinate strictly via the log in ZooKeeper.

ZooKeeper

Apache ZooKeeper is used as both storage and communication layer. ZooKeeper takes care of things like CAS, consensus, and atomic counting. ZooKeeper watches are at the heart of how Onyx virtual peers detect machine failure.

Aeron

Aeron is the primary messaging transport layer. The transport layer is pluggable, though we don’t support any other transports at this time since Aeron can "short-circuit" networking and act as fast as core.async.

The Log

This design centers around a totally ordered sequence of commands using a log structure. The log acts as an immutable history and arbiter. It’s maintained through ZooKeeper using a directory of persistent, sequential znodes. Virtual peers act as processes that consume from the log. At the time a peer starts up, it initializes its local replica to the "empty state". Log entries represent deterministic, idempotent functions to be applied to the local replica. The peer plays the log from the beginning, applying each log entry to its local replica. The local replica is transformed from one value to another. As a reaction to each replica transformation, the peer may send more commands to the tail of the log. Peers may play the log at any rate. After each peer has played k log entries, all peers at time k will have exactly the same local replica. Peers store everything in memory - so if a peer fails, it simply reboots from scratch with a new identifier and plays the log forward from the beginning. Since it has a new identifier, it has no association to the commands it previously issued; this prevents live lock issues.

The Inbox and Outbox

Every peer maintains its own inbox and output. Messages received appear in order on the inbox, and messages to-be-sent are placed in order on the outbox.

Messages arrive in the inbox as commands are proposed into the ZooKeeper log. Technically, the inbox need only be size 1 since all log entries are processed strictly in order. As an optimization, the peer can choose to read a few extra commands behind the one it’s currently processing. In practice, the inbox will probably be configured with a size greater than one.

The outbox is used to send commands to the log. Certain commands processed by the peer will generate other commands. For example, if a peer is idle and it receives a command notifying it about a new job, the peer will react by sending a command to the log requesting that it be allocated for work. Each peer can choose to pause or resume the sending of its outbox messages. This is useful when the peer is just acquiring membership to the cluster. It will have to play log commands to join the cluster fully, but it cannot volunteer to be allocated for work since it’s not officially yet a member of the cluster.

Applying Log Entries

This section describes how log entries are applied to the peer’s local replica. A log entry is a persistent, sequential znode. Its content is a map with keys :fn and :args. :fn is mapped to a keyword that finds this log entry’s implementation. :args is mapped to another map with any data needed to apply the log entry to the replica.

Peers begin with the empty state value, and local state. Local state maintains a mapping of things like the inbox and outbox - things that are specific to this peer, and presumably can’t be serialized as EDN.

Each peer starts a thread that listens for additions to the log. When the log gets a new entry, the peer calls onyx.extensions/apply-log-entry. This is a function that takes a log entry and the replica, and returns a new replica with the log entry applied to it. This is a value-to-value transformation.

diagram 1

A single peer begins with the empty replica ({}) and progressively applies log entries to the replica, advancing its state from one immutable value to the next.

diagram 2

A peer reads the first log entry and applies the function to its local replica, moving the replica into a state "as of" entry 0

diagram 4

Because application of functions from the log against the replica are deterministic and free of side effects, peers do not need to coordinate about the speed that each plays the log. Peers read the log on completely independent timelines

Peers effect change in the world by reacting to log entries. When a log entry is applied, the peer calls onyx.extensions/replica-diff, passing it the old and new replicas. The peer produces a value summarizing what changed. This diff is used in subsequent sections to decide how to react and what side-effects to carry out.

Next, the peer calls onyx.extensions/reactions on the old/new replicas, the diff, and its local state. The peer can decide to submit new entries back to the log as a reaction to the log entry it just saw. It might react to "submit-job" with "volunteer-for-task", for instance.

diagram 5

After a peer reads a log entry and applies it to the log replica, it will (deterministically!) react by appending zero or more log entries to the tail of the log.

Finally, the peer can carry out side-effects by invoking onyx.extensions/fire-side-effects!. This function will do things like talking to ZooKeeper or writing to core.async channels. Isolating side effects means that a subset of the test suite can operate on pure functions alone. Each peer is tagged with a unique ID, and it looks for this ID in changes to its replica. The ID acts very much like the object orientated "this", in that it uses the ID to differentiate itself to conditionally perform side effects across an otherwise uniformly behaving distributed system.

Joining the Cluster

Aside from the log structure and any strictly data/storage centric znodes, ZooKeeper maintains another directory for pulses. Each peer registers exactly one ephemeral node in the pulses directory. The name of this znode is a UUID.

3-Phase Cluster Join Strategy

When a peer wishes to join the cluster, it must engage in a 3 phase protocol. Three phases are required because the peer that is joining needs to coordinate with another peer to change its ZooKeeper watch. I call this process "stitching" a peer into the cluster.

The technique needs peers to play by the following rules: - Every peer must be watched by another peer in ZooKeeper, unless there is exactly one peer in the cluster - in which case there are no watches. - When a peer joins the cluster, all peers must form a "ring" in terms of who-watches-who. This makes failure repair very easy because peers can transitively close any gaps in the ring after machine failure. - As a peer joining the cluster begins playing the log, it must buffer all reactive messages unless otherwise specified. The buffered messages are flushed after the peer has fully joined the cluster. This is because a peer could volunteer to perform work, but later abort its attempt to join the cluster, and therefore not be able to carry out any work. - A peer picks another peer to watch by determining a candidate list of peers it can stitch into. This candidate list is sorted by peer ID. The target peer is chosen by taking the message id modulo the number of peers in the sorted candidate list. The peer chosen can’t be random because all peers will play the message to select a peer to stitch with, and they must all determine the same peer. Hence, the message modulo piece is a sort of "random seed" trick.

diagram 7

At monotonic clock value t = 42, the replica has the above :pairs key, indicates who watches whom. As nodes are added, they maintain a ring formation so that every peer is watched by another.

The algorithm works as follows:

  • let S = the peer to stitch into the cluster

  • S sends a prepare-join-cluster command to the log, indicating its peer ID

  • S plays the log forward

  • Eventually, all peers encounter prepare-join-cluster message that was sent by it

  • if the cluster size is 0:

  • S instantly becomes part of the cluster

  • S flushes its outbox of commands

  • if the cluster size (n) is >= 1:

  • let Q = this peer playing the log entry

  • let A = the set of all peers in the fully joined in the cluster

  • let X = the single peer paired with no one (case only when n = 1)

  • let P = set of all peers prepared to join the cluster

  • let D = set of all peers in A that are depended on by a peer in P

  • let V = sorted vector of (set-difference (set-union A X) D) by peer ID

  • if V is empty:

    • S sends an abort-join-cluster command to the log

    • when S encounters abort-join-cluster, it backs off and tries to join again later

  • let T = nth in V of message-id mod (count V)

  • let W = the peer that T watches

  • T adds a watch to S

  • T sends a notify-join-cluster command to the log, notifying S that it is watched, adding S to P

  • when S encounters notify-join-cluster:

    • it adds a watch to W

    • it sends a accept-join-cluster command, removing S from P, adding S to A

  • when accept-join-cluster has been encountered, this peer is part of the cluster

  • S flushes its outbox of commands

  • T drops its watch from W - it is now redundant, as S is watching W

diagram 13

Peers 1 - 4 form a ring. Peer 5 wants to join. Continued below…​

diagram 14

Peer 5 initiates the first phase of the join protocol. Peer 1 prepares to accept Peer 5 into the ring by adding a watch to it. Continued below…​

diagram 15

Peer 5 initiates the second phase of the join protocol. Peer 5 notifies Peer 4 as a peer to watch. At this point, a stable "mini ring" has been stitched along the outside of the cluster. We note that the link between Peer 1 and 4 is extraneous. Continued below…​

diagram 16

Peer 5 has been fully stitched into the cluster, and the ring is intact

Dead peer removal

Peers will fail, or be shut down purposefully. Onyx needs to: - detect the downed peer - inform all peers that this peer is no longer executing its task - inform all peers that this peer is no longer part of the cluster

Peer Failure Detection Strategy

In a cluster of > 1 peer, when a peer dies another peer will have a watch registered on its znode to detect the ephemeral disconnect. When a peer fails (peer F), the peer watching the failed peer (peer W) needs to inform the cluster about the failure, and go watch the node that the failed node was watching (peer Z). The joining strategy that has been outlined forces peers to form a ring. A ring structure has an advantage because there is no coordination or contention as to who must now watch peer Z for failure. Peer W is responsible for watching Z, because W was watching F, and F was watching Z. Therefore, W transitively closes the ring, and W watches Z. All replicas can deterministically compute this answer without conferring with each other.

diagram 8

The nodes form a typical ring pattern. Peer 5 dies, and its connection with ZooKeeper is severed. Peer 1 reacts by reporting Peer 5’s death to the log. Continued below…​

diagram 9

At t = 45, all of the replicas realize that Peer 5 is dead, and that Peer 1 is responsible for closing the gap by now watching Peer 4 to maintain the ring.

diagram 10

One edge case of this design is the simultaneous death of two or more consecutive peers in the ring. Suppose Peers 4 and 5 die at the exact same time. Peer 1 will signal Peer 5’s death, but Peer 5 never got the chance to signal Peer 4’s death. Continued below…​

diagram 11

Peer 1 signals Peer 5’s death, and closes to the ring by adding a watch to Peer 4. Peer 4 is dead, but no one yet knows that. We circumvent this problem by first determining whether a peer is dead or not before adding a watch to it. If it’s dead, as is Peer 4 in this case, we report it and further close the ring. Continued below…​

diagram 12

Peer 1 signals peer 4’s death, and further closes to the ring by adding a watch to Peer 3. The ring is now fully intact.

Peer Failure Detection Thread

There is a window of time (inbetween when a peer prepares to join the cluster and when its monitoring peer notifies the cluster of its presence) that the monitoring node may fail, effectively deadlocking the new peer. This can occur because a peer will check if its monitoring dead is dead during the prepare phase - essentially performing eviction on a totally dead cluster - and may find a false positive that a node is alive when it is actually dead. The root issue is that ephemeral znodes stick around for a short period of time after the creating process goes down. The new peer must watch its monitor until it delivers the second phase message for joining - notification. When this occurs, we can stop monitoring, because the monitoring node is clearly alive. If the znode is deleted because the process exited, we can safely effect it and free the peer from deadlocking. Issue 416 found this bug, and offers more context about the specific problem that we encountered.

Messaging

The messaging layer of Onyx employees the same technique that Apache Storm uses to achieve fault tolerance. Any errors are our own.

The Algorithm

Onyx guarantees that each segment read from an input task will be processed, and provide at-least-once delivery semantics. Every segment that comes off an input task is given a UUID to track it through its lifetime. It is also given a peer ID that it uses as an "acking daemon", explained in more detail below. The segment also receives an initial "ack val". The ack val is a random 64-bit integer. Each time a segment is successfully processed at each task, this ack-val is XOR’ed with itself. Further, any new segments that are generated as a result of this segment being completed are given random ack vals, too. These ack vals are also XOR’ed against the previous XOR value. When no new segments are generated, the result of XOR’ing all the segment ack vals returns 0. Finding 0 means that the segment has been successfully processed throughout the entire workflow.

Acking Daemon

An acking daemon is a process that runs alongside each peer and maintains state. This state is a map of segment ID to another map. The map in the value maintains the current "ack val" and the peer to send completion messages to. When the ack val for a segment is set to zero, a message is send to the appropriate peer to release the message from durable storage. This concludes the processing of the segment, and it is considered successful. Key/value pairs are periodically reaped if peers that are operating on these segments are lost. If these values are reaped, the message is automatically replayed from the root of the workflow on the input task on a rolling basis.

We can depict all of this visually:

messaging summary

Phases of Execution

A batch of segments runs through the following phases of execution in sequential order:

  • Inject resources: Initiates any values for this particular batch

  • Read message batch: reads and decompresses messages from the transport layer

  • Tag messages: If these are messages from an input task, uniquely tags each segment to track it through its lifetime

  • Timeout pool: If these are messages from an input task, adds these messages to a timeout pool to automatically expire on a preconfigured schedule

  • Completion: Checks if this job has been completed, and shuts down the job if so

  • Strip sentinel: Removes the sentinel if it’s in this batch, signal that the job may be completed

  • Apply function: Apply fns to batches of segments

  • Build new segments: Creates and IDs new segments based on the received segments

  • Write message batch: Writes messages to the next peer or output medium

  • Flow retry: Cause messages back at the input task to play again that are force-retried via flow conditions.

  • Ack messages: Acknowledges the segments that have been processed to the acking daemon

  • Close batch resources: Closes any resources opened for this specific batch

Garbage collection

One of the primary obstacles that this design imposes is the requirement of seemingly infinite storage. Log entries are only ever appended - never mutated. If left running long enough, ZooKeeper will run out of space. Similarly, if enough jobs are submitted and either completed or killed, the in memory replica that each peer houses will grow too large. Onyx requires a garbage collector to be periodically invoked.

When the garbage collector is invoked, two things will happen. The caller of gc will place an entry onto the log. As each peer processed this log entry, it carries out a deterministic, pure function to shrink the replica. The second thing will occur when each peer invokes the side effects for this log entry. The caller will have specified a unique ID such that it is the only one that is allowed to trim the log. The caller will take the current replica (log entry N to this log entry), and store it in an "origin" znode. Anytime that a peer boots up, it first reads out of the origin location. Finally, the caller deletes log entry N to this log entry minus 1. This has the dual effect of making new peers start up faster, as they have less of the log to play. They begin in a "hot" state.

The garbage collector can be invoked by the public API function onyx.api/gc. Upon returning, the log will be trimmed, and the in memory replicas will be compressed.

diagram 17

A peer can start by reading out of the origin, and continue directly to a particular log location.

Command Reference

  • Submitter: peer (P) that wants to join the cluster

  • Purpose: determines which peer (Q) that will watch P. If P is the only peer, it instantly fully joins the cluster

  • Arguments: P’s ID

  • Replica update: assoc {Q P} to :prepare key. If P is the only peer, P is immediately added to the :peers key, and no further reactions are taken

  • Side effects: Q adds a ZooKeeper watch to P’s pulse node

  • Reactions: Q sends notify-join-cluster to the log, with args P and R (R being the peer Q watches currently)

  • Submitter: peer Q helping to stitch peer P into the cluster

  • Purpose: Adds a watch from P to R, where R is the node watched by Q

  • Arguments: P and R’s ids

  • Replica update: assoc {Q P} to :accept key, dissoc {Q P} from :prepare key

  • Side effects: P adds a ZooKeeper watch to R’s pulse node

  • Reactions: P sends accept-join-cluster to the log, with args P, Q, and R

  • Submitter: peer P wants to join the cluster

  • Purpose: confirms that P can safely join, Q can drop its watch from R, since P now watches R, and Q watches P

  • Arguments: P, Q, and R’s ids

  • Replica update: dissoc {Q P} from :accept key, merge {Q P} and {P R} into :pairs key, conj P onto the :peers key

  • Side effects: Q drops its ZooKeeper watch from R

  • Reactions: peer P flushes its outbox of messages

  • Submitter: virtual peer P wants to become active in the cluster

  • Purpose: P affirms that it’s peer group has been safely stitched into the cluster

  • Arguments: P’s id

  • Replica update: conj P into :peers, remove from :orphaned-peers

  • Side effects: All virtual peers configure their workload and possibly start new tasks

  • Reactions: none

  • Submitter: peer (Q) determines that peer (P) cannot join the cluster (P may = Q)

  • Purpose: Aborts P’s attempt at joining the cluster, erases attempt from replica

  • Arguments: P’s id

  • Replica update: Remove any :prepared or :accepted entries where P is a key’s value

  • Side effects: P optionally backs off for a period

  • Reactions: P optionally sends :prepare-join-cluster to the log and tries again

  • Submitter: peer (Q) reporting that peer P is dead

  • Purpose: removes P from :prepared, :accepted, :pairs, and/or :peers, transitions Q’s watch to R (the node P watches) and transitively closes the ring

  • Arguments: peer ID of P

  • Replica update: assoc {Q R} into the :pairs key, dissoc {P R}

  • Side effects: Q adds a ZooKeeper watch to R’s pulse node

  • Submitter: virtual peer P is leaving the cluster

  • Purpose: removes P from its task and consideration of any future tasks

  • Arguments: peer ID of P

  • Replica update: removes P from :peers

  • Side effects: All virtual peers reconfigure their workloads for possibly new tasks

  • Submitter: peer (P), who has seen the leader sentinel

  • Purpose: P wants to propagate the sentinel to all downstream tasks

  • Arguments: P’s ID (:id), the job ID (:job), and the task ID (:task)

  • Replica update: If this peer is allowed to seal, updates :sealing-task with the task ID associated this peers ID.

  • Side effects: Puts the sentinel value onto the queue

  • Reactions: None

  • Submitter: Client, via public facing API

  • Purpose: Send a catalog and workflow to be scheduled for execution by the cluster

  • Arguments: The job ID (:id), the task scheduler for this job (:task-scheduler), a topologically sorted sequence of tasks (:tasks), the catalog (:catalog), and the saturation level for this job (:saturation). Saturation denotes the number of peers this job can use, at most. This is typically Infinity, unless all catalog entries set :onyx/max-peers to an integer value. Saturation is then the sum of those numbers, since it creates an upper bound on the total number of peers that can be allocated to this task.

  • Replica update:

  • Side effects: None

  • Reactions: If the job scheduler dictates that this peer should be reallocated to this job or another job, sends :volunteer-for-task to the log

  • Submitter: Client, via public facing API

  • Purpose: Stop all peers currently working on this job, and never allow this job’s tasks to be scheduled for execution again

  • Arguments: The job ID (:job)

  • Replica update: Adds this job id to :killed-jobs vector, removes any peers in :allocations for this job’s tasks. Switches the :peer-state for all peer’s executing a task for this job to :idle.

  • Side effects: If this peer is executing a task for this job, stops the current task lifecycle

  • Reactions: If this peer is executing a task for this job, reacts with :volunteer-for-task

gc

  • Submitter: Client, via public facing API

  • Purpose: Compress all peer local replicas and trim old log entries in ZooKeeper.

  • Arguments: The caller ID (:id)

  • Replica update: Clears out all data in all keys about completed and killed jobs - as if they never existed.

  • Side effects: Deletes all log entries before this command’s entry, creates a compressed replica at a special origin log location, and updates to the pointer to the origin

  • Reactions: None

  • Submitter: peer (P), who has successfully started its incoming buffer

  • Purpose: Indicates that this peer is ready to receive segments as input

  • Replica update: Updates :peer-state under the :id of this peer to set its state to :active.

  • Side effects: If this task should immediately be sealed, seals this task

  • Reactions: None.

  • Submitter: This is a special entry that should never be appended to the log

  • Purpose: Perform a hard reset of the replica, replacing its entire value. This is useful if a log subscriber is reading behind a garbage collection call and tries to read a non-existent entry. The new origin can be found and its value applied locally via the subscriber.

  • Replica update: Replaces the entire value of the replica with a new value

  • Side effects: None.

  • Reactions: None.

  • Submitter: Peer that performs log initialization.

  • Purpose: Sets an identifier to the ledger that will track the state represented by this log.

  • Replica update: Updates :state-logs with the id.

  • Side effects: None.

  • Reactions: None.

APIs

Onyx ships with two distinct APIs to accommodate different needs. A description of each follows.

Core API

The Core API is used to start/stop resources, jobs, and monitor job progress. It’s accessible through the onyx.api namespace.

start-env

Starts a development environment with in-memory ZooKeeper. Helpful for developing locally without needing to start any other services.

start-peer-group

Starts a resource pool to be shared across a group of peers. You should only start one peer group per physical machine.

start-peers

Starts N virtual peers to execute tasks. In a production environment, you should start by booting up N virtual peers for N cores on the physical machine. Tune performance from there.

submit-job

Submits a job to Onyx to be scheduled for execution. Takes a map with keys :catalog, :workflow, :flow-conditions, :windows, :triggers, :metadata, and :task-scheduler. Returns a map of :job-id and :task-ids, which map to a UUID and vector of maps respectively. :metadata is a map of values that must serialize to EDN. :metadata will be logged with all task output, and is useful for identifying a particular task based on something other than its name or ID.

Additionally, :metadata may optionally contain a :job-id key. When specified, this key will be used for the job ID instead of a randomly chosen UUID. Repeated submissions of a job with the same :job-id will be treated as an idempotent action. If a job with the same ID has been submitted more than once, the original task IDs associated with the catalog will be returned, and the job will not run again, even if it has been killed or completed. It is undefined behavior to submit two jobs with the same :job-id metadata whose :workflow, :catalog, :flow-conditions, etc are not equal.

await-job-completion

Given a job ID, blocks the calling thread until all the tasks for this job have been completed.

Tip
Example project: block-on-job-completion

gc

Invokes the garbage collector. Compresses the replica in Zookeeper, freeing up storage and deleting log history. Frees up memory on the local, in memory replica on all peers.

kill-job

Stops this job from executing, never allowing it to be run again.

Tip
Example project: kill-job

subscribe-to-log

Sends all events in the log to a core.async channel. Events are received in the order that they appeared in the log. Starts from the beginning of the log, blocking until more entries are available.

shutdown-peer

Shuts down a single peer, stopping any task that it is presently executing.

shutdown-peer-group

Shuts down the peer group, releasing any messaging resources it was holding open.

shutdown-env

Shuts down the development environment and stops in memory ZooKeeper.

Peer Pipeline API

The Peer Pipeline API allows you to interact with data storage mediums to read and write data for plugins.

read-batch

Reads multiple segments off the previous element in the workflow.

write-batch

Writes the batch with the function applied to the output stream.

seal-resource

Called by one peer exactly once (subsequent calls occur if the sealing peer fails) when the task is completing. Close out target output resources.

ack-segment

Acknowledges a segment natively on the input medium, causing the segment to be released from durable storage.

retry-segment

Processes a segment again from the root of the workflow.

pending?

Given a segment ID, returns true if this segment is pending completion.

drained?

Returns true if all messages on the input medium have successfully been processed. Never returns true for an infinite message stream.

Functions

This section outlines how Onyx programs execute behavior. Onyx uses plain Clojure functions to carry out distributed activity. You have the option of performing grouping and aggregation on each function.

Functional Transformation

A Function is a construct that takes a segment as a parameter and outputs a segment or a seq of segments. Functions are meant to literally transform a single unit of data in a functional manner. The following is an example of a function:

(defn my-inc [{:keys [n] :as segment}]
  (assoc segment :n (inc n)))

Note that you may only pass segments between functions - no other shape of data is allowed.

Tip
Example project: filtering

Function Parameterization

A function can be parameterized before a job is submitted to Onyx. The segment is always the last argument to the function. There are multiple ways to paramerize a function, and they can be used in combination.

  • Via the catalog :onyx/params entry

(def catalog
{...
 :my/param-1 "abc"
 :my/param-2 "def"
 :onyx/params [:my/param-1 :my/param-2]
 ...}

The function is then invoked with (partial f "abc" "def"). The order is controlled by the vector of :onyx/params.

  • Via :onyx.core/params in the before-task-start lifecycle hook

(defn before-task-start-hook [event lifecycle]
  {:onyx.core/params [42]})

The function is then invoked with (partial f 42).

Using this approach "hard sets" the parameters list. Other parameters may already exist in onyx.core/params. If you want to retain those parameter, concat them together and return the new value on onyx.core/params.

  • Via the :onyx.peer/fn-params peer configuration

(def peer-opts
  {...
   :onyx.peer/fn-params {:my-fn-name [64]}})

The function is then invoked with (partial f 64).

This approach is useful for parameterizing a task regardless of which job it is in. If both onyx.peer/fn-params and :onyx/params are set for the same task, they are concatenated together, with fn-params coming first.

Grouping & Aggregation

Grouping ensures that "like" values are always routed to the same virtual peer, presumably to compute an aggregate. Grouping is specified inside of a catalog entry. There are two ways to group: by key of segment, or by arbitrary function. Grouping by key is a convenience that will reach into each segment and pin all segments with the same key value in the segment together. Grouping functions receive a single segment as input. The output of a grouping function is the value to group on. Grouped functions must set keys :onyx/min-peers and :onyx/flux-policy. See below for a description of these.

Group By Key

To group by a key or a vector of keys in a segment, use :onyx/group-by-key in the catalog entry:

{:onyx/name :sum-balance
 :onyx/fn :onyx.peer.kw-grouping-test/sum-balance
 :onyx/type :function
 :onyx/group-by-key :name
 :onyx/min-peers 3
 :onyx/flux-policy :continue
 :onyx/batch-size 1000}

Group By Function

To group by an arbitrary function, use :onyx/group-by-fn in the catalog entry:

{:onyx/name :sum-balance
 :onyx/fn :onyx.peer.fn-grouping-test/sum-balance
 :onyx/type :function
 :onyx/group-by-fn :onyx.peer.fn-grouping-test/group-by-name
 :onyx/min-peers 3
 :onyx/flux-policy :continue
 :onyx/batch-size 1000}

Flux Policies

Functions that use the grouping feature are presumably stateful. For this reason, unless :continue is used, once a job begins, no matter how many peers are added to the cluster, no new peers will be allocated to grouping tasks. When more peers are added after the job begins, the hashing algorithm loses its consistency, and stateful operations won’t work correctly.

Given the fact the Onyx will not add more peers to regular grouping tasks after it begins, we introduce a new parameter - :onyx/min-peers. This should be set to an integer that indicates the minimum number of peers that will be allocated to this task before the job can begin. Onyx may schedule more than the minimum number that you set. You can create an upper bound by also using :onyx/max-peers.

Tip
Example project: max-peers.

One concern that immediately needs to be handled is addressing what happens if a peer on a grouping task leaves the cluster after the job has begun? Clearly, removing a peer from a grouping task also breaks the consistent hashing algorithm that supports statefulness. The policy that is enforced is configurable, and must be chosen by the developer. We offer two policies, outlined below.

Continue Policy

When :onyx/flux-policy is set to :continue on a catalog entry, the hashing algorithm may be inconsistent. Peers can leave or join a task at any point in time. This is desirable for streaming jobs where the data is theoretically infinite or have tasks that benefit from grouping but are not stateful.

Kill Policy

When :onyx/flux-policy is set to :kill, the job is killed and all peers abort execution of the job. Some jobs cannot compute correct answers if there is a shift in the hashing algorithm’s consistency. An example of this is a word count batch job.

Recover Policy

When :onyx/flux-policy is set to :recover, the job is continues as is if any peers abort execution of the task. If any other peers are available, they will be added to this task to progressively meet the :onyx/min-peers number of peers concurrently working on this task.

Bulk Functions

Sometimes you might be able to perform a function more efficiently over a batch of segments rather than processing one segment at a time, such as writing segments to a database in a non-output task. You can receive the entire batch of segments in bulk as an argument to your task by setting :onyx/bulk? to true in your catalog entry for your function. Onyx will ignore the output of your function and pass the same segments that you received downstream. The utility of this feature is that you receive the entire batch in one shot. Onyx ignores your output because it would make it impossible to track which specific messages are children of particular upstream messages - breaking Onyx’s fault tolerance mechanism.

Functions with this key enabled may not be used with flow conditions. These segments are passed to all immediate downstream tasks.

An example catalog entry:

{:onyx/name :inc
 :onyx/fn :onyx.peer.batch-function-test/my-inc
 :onyx/type :function
 :onyx/bulk? true
 :onyx/batch-size batch-size}

And an example catalog function to correspond to this entry:

(defn my-inc [segments]
  (prn segments)
  :ignored-return-value)

The default value for this option is false.

Leaf Functions

Sometimes you’re going to want a node in your workflow with no outgoing connections that doesn’t perform I/O against a database. You can do this by setting :onyx/type to :output, :onyx/medium to :function, and :onyx/plugin to onyx.peer.function/function. Then you can specify an :onyx/fn pointing to a regular Clojure function. For example:

{:onyx/name :leaf-task
 :onyx/fn ::add-to-results
 :onyx/plugin :onyx.peer.function/function
 :onyx/medium :function
 :onyx/type :output
 :onyx/batch-size 20}

Flow Conditions

This section covers flow conditions. Flow conditions are used for isolating logic about whether or not segments should pass through different tasks in a workflow, and support a rich degree of composition with runtime parameterization.

Summary

Workflows specify the structure of your computation as a directed, acyclic graph. A workflow describes all possible routes that a segment can take as it enters your workflow. On the other hand, we often have the need to specify how an individual segment moves throughout your workflow. Many times, a segment conditionally moves from one task to another. This is a concept that Onyx takes apart and turns into its own idea, independent of the rest of your computation. They’re called Flow Conditions. It should be mentioned straight away that Flow Conditions are entirely optional, and your program can ignore them entirely if you’d like. Omitting them leads to the default behavior, which sends a segment to all immediate downstream tasks.

Motivating Example

The easiest way to learn how to use flow conditions is to see an example. Suppose we have the following workflow snippet:

[[:input-stream :process-children]
 [:input-stream :process-adults]
 [:input-stream :process-female-athletes]
 [:input-stream :process-everyone]
 ...]

This workflow takes some input in (presumably a stream of people), and directs segments to four possible tasks - :process-children, :process-adults, :process-female-athletes, and :process-everyone. Suppose we want to conditionally direct a segment to zero or more of these tasks, depending on some predicates. We use flow conditions to carry out this work. Flow conditions are their own data structure that are bundled along with the workflow and catalog to onyx.api/submit-job (with key :flow-conditions). Here’s an example of what a flow conditions data structure would look like for our proposed workflow:

[{:flow/from :input-stream
  :flow/to [:process-children]
  :my/max-child-age 17
  :flow/predicate [:my.ns/child? :my/max-child-age]
  :flow/doc "Emits segment if this segment is a child."}

 {:flow/from :input-stream
  :flow/to [:process-adults]
  :flow/predicate :my.ns/adult?
  :flow/doc "Emits segment if this segment is an adult."}

 {:flow/from :input-stream
  :flow/to [:process-female-athletes]
  :flow/predicate [:and :my.ns/female? :my.ns/athlete?]
  :flow/doc "Emits segment if this segment is a female athlete."}

 {:flow/from :input-stream
  :flow/to [:process-everyone]
  :flow/predicate :my.ns/constantly-true
  :flow/doc "Always emit this segment"}]

The basic idea is that every entry in the Flow Conditions data structure denotes a relationship between a task and its downstream tasks. :flow/from indicates the task that the segment is leaving, and :flow/to indicates the tasks that the segment should be sent to if the predicate evaluates to true. The predicate is denoted by :flow/predicate, which is a keyword or sequence of keywords that are resolved to a function. Later in this section, we’ll cover how exactly the predicate function is constructed.

There is one flow conditions data structure per job - that is, there is one vector of maps. The order that you specify the flow conditions in matters. More on that later in this section.

Tip
Example project: flow-combine

Predicate Function Signatures

A predicate function is a Clojure function that takes at least four parameters - a context map, the old segment, the new segment, and the collection of all new segments produced from the old segment. Predicates can take parameters at runtime. They will be appended to the end of the function invocation. See Predicate Parameters for further discussion of how to use runtime parameters.

Predicates for the above examples can be seen below:

(defn child? [event old-segment new-segment all-new max-age]
  (<= (:age new-segment) max-age))

(defn adult? [event old-segment new-segment all-new]
  (>= (:age new-segment) 18))

(defn female? [event old-segment new-segment all-new]
  (= (:gender new-segment) "Female"))

(defn athlete? [event old-segment new-segment all-new]
  (= (:job new-segment) "athlete"))

(def constantly-true (constantly true))

Predicate Parameters

Predicate functions can take parameters at runtime. In this first flow condition, we use the parameter :my/max-child-age and set its value to 17. We pass this value to the predicate by surrounding it with brackets, as in: [:my.ns/child? :my/max-child-age]. The parameters are appended to the end of the function call to the predicate. See Predicate Function Signatures in this section to see the arguments that are passed into the predicate regardless each invocation.

Key Exclusion

Sometimes, the decision of whether to allow a segment to pass through to the next task depends on some side effects that were a result of the original segment transformation. Onyx allows you to handle this case by adding extra keys to your segment that comes out of the transformation function. These extra keys are visible in your predicate function, and then stripped off before being sent to the next task. You can indicate these "extra keys" by the setting :onyx/exclude-keys to a vector of keys.

For example, if we had the following transformation function:

(defn my-function [x]
  (assoc x :result 42 :side-effects-result :blah))

Our predicate for flow conditions might need to use the :side-effects-result to make a decision. We don’t want to actually send that information over out to the next task, though - so we :flow/exclude-keys on :side-effects-results to make it disappear after the predicate result has been realized.

{:flow/from :input-stream
 :flow/to [:process-adults]
 :flow/predicate :my.ns/adult?
 :flow/exclude-keys [:side-effects-result]
 :flow/doc "Emits segment if this segment is an adult."}
Tip
Example project: flow-exclude-keys

Predicate Composition

One extraordinarily powerful feature of Flow Conditions is its composition characteristics. Predicates can be composed with logical and, or, and not. We use composition to check if the segment is both female and an athlete in [:and :my.ns/female? :my.ns/athlete?]. Logical function calls must be surrounded with brackets, and may be nested arbitrarily. Functions inside of logical operator calls may be parameterized, as in [:and :my.ns/female? [:my.ns/child? :my/max-child-age]]. Parameters may not specify logical functions.

Tip
Example project: flow-predicate-composition

Match All/None

Sometimes, you want a flow condition that emits a value to all tasks if the predicate is true. You can use short hand to emit to all downstream tasks:

{:flow/from :input-stream
 :flow/to :all
 :flow/short-circuit? true
 :flow/predicate :my.ns/adult?}

Similarly, sometimes you want to emit to no downstream tasks:

{:flow/from :input-stream
 :flow/to :none
 :flow/short-circuit? true
 :flow/predicate :my.ns/adult?}

If a flow condition specifies :all as its :flow/to, it must come before any other flow conditions. If a flow condition specifies :none as its :flow/to, it must come directly behind an :all condition, or first if there is no :all condition. This is because of the semantics of short circuiting. We’ll discuss what short circuiting means later in this section.

:flow/to set to :all or :none must always set :flow/short-circuit? to true.

:flow/from may be set to :all. This directs all immediate upstream links to pass segments to this task’s flow condition. :flow/from as :all does not impose order constraints as :flow/to set to :all does.

Short Circuiting

If multiple flow condition entries evaluate to a true predicate, their :flow/to values are unioned (duplicates aren’t acknowledged), as well as their :flow/exclude-keys. Sometimes you don’t want this behavior, and you want to specify exactly the downstream tasks to emit to - and not check any more flow condition entries. You can do this with :flow/short-circuit? set to true. Any entry that has :flow/short-circuit? set to true must come before any entries for an task that have it set to false or nil.

Tip
Example project: flow-short-circuit

Exceptions

Flow Conditions give you leverage for handling exceptions without miring your code in try/catch logic. If an exception is thrown from an Onyx transformation function, you can capture it from within your flow conditions by setting :flow/thrown-exception? to true. It’s default value is false. If an exception is thrown, only flow conditions with :flow/thrown-exception? set to true will be evaluated. The value that is normally the segment which is sent to the predicate will be the exception object that was thrown. Exception flow conditions must have :flow/short-circuit? set to true. Note that exceptions don’t serialize. This feature is meant to be used in conjunction with Post-transformations and Actions for sending exception values to downstream tasks.

{:flow/from :input-stream
 :flow/to [:error-task]
 :flow/short-circuit? true
 :flow/thrown-exception? true
 :flow/predicate :my.ns/handle-error?}

And the predicate might be:

(defn handle-error? [event old ex-obj all-new]
  (= (type ex-obj) java.lang.NullPointerException))

This will only restrict the flow from :input-stream to :error-task when an exception is thrown - see the discussion of Short Circuiting above. When an exception is not thrown, the default behaviour will apply. For example, if there are later flow conditions, they will apply. If not will flow through to all tasks if there are no other flow conditions for that task.

Post-transform

Post-transformations are extension provided to handle segments that cause exceptions to arise. If a flow condition has :flow/thrown-exception? set to true, it can also set :flow/post-transform to a keyword. This keyword must have the value of a fully namespace qualified function on the classpath. This function will be invoked with three parameters: the event map, the segment that caused the exception, and the exception object. The result of this function, which must be a segment, will be passed to the downstream tasks. This allows you to come up with a reasonable value to pass downstream when you encounter an exception, since exceptions don’t serialize anyway. :flow/exclude-keys will be called on the resulting transformed segment.

Example:

{:flow/from :input-stream
 :flow/to [:error-task]
 :flow/short-circuit? true
 :flow/thrown-exception? true
 :flow/post-transform :my.ns/post-transform
 :flow/predicate :my.ns/handle-error?}

And an example post-transform function might be:

(defn post-transform [event segment exception-obj]
  {:error :my-exception-value})

Actions

After a set of flow conditions has been evaluated for a segment, you usually want to send the segment downstream to the next set of tasks. Other times, you want to retry to process the segment because something went wrong. Perhaps a database connection wasn’t available, or an email couldn’t be sent.

Onyx provides Flow Conditions :flow/action to accomplish this. By setting :flow/action to :retry, a segment will expire from the internal pool of pending messages and be automatically retried from its input task. If any of the :flow/action`s from the matching flow conditions are `:retry, the segment will be retried and will not be sent downstream. This parameter is optional, and it’s default value is nil. nil will cause the segment to be sent to all downstream tasks that were selected from evaluating the flow conditions. Any flow condition clauses with :flow/action set to :retry must also have :flow/short-circuit? set to true, and :flow/to set to :none.

Here’s a quick example:

[{:flow/from :input-stream
  :flow/to :none
  :flow/short-circuit? true
  :flow/predicate :my.ns/adult?
  :flow/action :retry}

 {:flow/from :input-stream
  :flow/to [:task-a]
  :flow/predicate :my.ns/child?}]

Messaging

Background

The messaging layer takes care of the direct peer to peer transfer of segment batches, acks, segment completion and segment retries to the relevant virtual peers.

Messaging Implementations

The Onyx messaging implementation is pluggable and alternative implementations can be selected via the :onyx.messaging/impl Peer Configuration option.

Aeron Messaging

Owing to Aeron’s high throughput and low latency, Aeron is the default Onyx messaging implementation. There are a few relevant considerations when using the Aeron implementation.

Subscription (Connection) Multiplexing

One issue when scaling Onyx to a many node cluster is that every virtual peer may require a communications channel to any other virtual peer. As a result, a naive implementation will require up to m*m connections over the cluster, where m is the number of virtual peers. By sharing Aeron subscribers between virtual peers on a node, this can be reduced to n*n connections, where n is the number of nodes. This reduces the amount of overhead required to maintain connections between peers, allowing the cluster to scale better as the number of nodes to increase.

It is worth noting that Aeron subscribers (receivers) must also generally perform deserialization. Therefore, subscribers may become CPU bound by the amount of deserializaton work that needs to be performed. In order to reduce this effect, multiple subscribers can be instantiated per node. This can be tuned via :onyx.messaging.aeron/subscriber-count in Peer Configuration. As increasing the number of subscribers may lead back to an undesirable growth in the number of connections between nodes, each node will only choose one subscription to communicate through. The choice of subscriber is calculated via a hash of the combined IPs of the communicating nodes, in order to consistently spread the use of subscribers over the cluster.

Clusters which perform a large proportion of the time serializing should consider increasing the subscriber count. As a general guide, cores = virtual peers + subscribers.

Connection Short Circuiting

When virtual peers are co-located on the same node, messaging will bypass the use of Aeron and directly communicate the message without any use of the network and without any serialization. Therefore, performance benchmarks performed on a single node can be very misleading.

The Peer Configuration option, :onyx.messaging/allow-short-circuit? is provided for the purposes of more realistic performance testing on a single node.

Port Use

The Aeron messaging implementation will use the port configured via :onyx.messaging/peer-port. This UDP port must be unfirewalled.

Media Driver

Aeron requires a media driver to be used on each node. Onyx provides an embedded media driver for local testing, however use of the embedded driver is not recommended in production. The embedded driver can be configured via the :onyx.messaging.aeron/embedded-driver? Peer Configuration option.

When using Aeron messaging in production, a media driver should be created in another java process. You can do this via the following code snippet, or by using the Aeron distribution.

(ns your-app.aeron-media-driver
  (:require [clojure.core.async :refer [chan <!!]])
  (:import [uk.co.real_logic.aeron Aeron$Context]
           [uk.co.real_logic.aeron.driver MediaDriver MediaDriver$Context ThreadingMode]))

(defn -main [& args]
  (let [ctx (doto (MediaDriver$Context.))
        media-driver (MediaDriver/launch ctx)]
    (println "Launched the Media Driver. Blocking forever...")
    (<!! (chan))))
Configuration Options

Aeron is independently configurable via Java properties (e.g. JAVA_OPTS="-Daeron.mtu.length=16384"). Configuration of these may cause different performance characteristics, and certain options may need to be configured in order to communicate large segments between peers.

Documentation for these configuration options can be found in Aeron’s documentation.

Lifecycles

Lifecycles are a feature that allow you to control code that executes at particular points during task execution on each peer. Lifecycles are data driven and composable.

Summary

There are several interesting points to execute arbitrary code during a task in Onyx. Onyx lets you plug in and calls functions before a task, after a task, before a batch, and after a batch on every peer. Additionally, there is another lifecycle hook that allows you to delay starting a task in case you need to do some work like acquiring a lock under contention. A peer’s lifecycle is isolated to itself, and lifecycles never coordinate across peers. Usage of lifecycles are entirely optional. Lifecycle data is submitted as a data structure at job submission time.

Lifecycle Phases

Before task set up

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a boolean value indicating whether to start the task or not. If false, the process backs off for a preconfigured amount of time and calls this task again. Useful for lock acquisition. This function is called prior to any processes inside the task becoming active.

Before task execution

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called after processes in the task are launched, but before the peer listens for incoming segments from other peers.

Before segment batch start

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called prior to receiving a batch of segments from the reading function.

After read segment batch

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called immediately after a batch of segments has been read by the peer. The segments are available in the event map by the key :onyx.core/batch.

After segment batch start

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called after all messages have been written and acknowledged.

After task execution

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called before the peer relinquishes its task. No more segments will be received.

After ack message

A function that takes four arguments - an event map, a message id, the return of an input plugin ack-segment call, and the matching lifecycle map. May return a value of any type which will be discarded. This function is called whenever a segment at the input task has been fully acked.

After retry message

A function that takes four arguments - an event map, a message id, the return of an input plugin ack-segment call, and the matching lifecycle map. May return a value of any type which will be discarded. This function is called whenever a segment at the input task has been pending for greater than pending-timeout time and will be retried.

Handle Exception

If an exception is thrown during any lifecycle execution except after-task-stop, one or more lifecycle handlers may be defined. If present, the exception will be caught and passed to this function. See the details on the Onyx cheat sheet.

Example

Let’s work with an example to show how lifecycles work. Suppose you want to print out a message at all the possible lifecycle hooks. You’d start by defining 9 functions for the 9 hooks:

(ns my.ns)

(defn start-task? [event lifecycle]
  (println "Executing once before the task starts.")
  true)

(defn before-task-start [event lifecycle]
  (println "Executing once before the task starts.")
  {})

(defn after-task-stop [event lifecycle]
  (println "Executing once after the task is over.")
  {})

(defn before-batch [event lifecycle]
  (println "Executing once before each batch.")
  {})

(defn after-read-batch [event lifecycle]
  (printn "Executing once after this batch has been read.")
  {})

(defn after-batch [event lifecycle]
  (println "Executing once after each batch.")
  {})

(defn after-ack-segment [event message-id rets lifecycle]
  (println "Message " message-id " is fully acked"))

(defn after-retry-segment [event message-id rets lifecycle]
  (println "Retrying message " message-id))

(defn handle-exception [event lifecycle lifecycle-phase e]
  (println "Caught exception: " e)
  (println "Returning :restart, indicating that this task should restart.")
  :restart)

Notice that all lifecycle functions return maps except start-task?. This map is merged back into the event parameter that you received. start-task? is a boolean function that allows you to block and back off if you don’t want to start the task quite yet. This function will be called periodically as long as false is returned. If more than one start-task? is specified in your lifecycles, they must all return true for the task to begin. start-task? is invoked before before-task-start.

Next, define a map that wires all these functions together by mapping predefined keywords to the functions:

(def calls
  {:lifecycle/start-task? start-task?
   :lifecycle/before-task-start before-task-start
   :lifecycle/before-batch before-batch
   :lifecycle/after-read-batch after-read-batch
   :lifecycle/after-batch after-batch
   :lifecycle/after-task-stop after-task-stop
   :lifecycle/after-ack-segment after-ack-segment
   :lifecycle/after-retry-segment after-retry-segment
   :lifecycle/handle-exception handle-exception})

Each of these 9 keys maps to a function. All of these keys are optional, so you can mix and match depending on which functions you actually need to use.

Finally, create a lifecycle data structure by pointing :lifecycle/calls to the fully qualified namespaced keyword that represents the calls map that we just defined. Pass it to your onyx.api/submit-job call:

(def lifecycles
  [{:lifecycle/task :my-task-name-here
    :lifecycle/calls :my.ns/calls
    :lifecycle/doc "Test lifecycles and print a message at each stage"}])

(onyx.api/submit-job
  peer-config
  {
  ...
  :lifecycles lifecycles
  ...
  }

It is also possible to have a lifecycle apply to every task in a workflow by specifying :lifecycle/task :all. This is useful for instrumenting your tasks with metrics, error handling, or debugging information.

(def lifecycles
  [{:lifecycle/task :all
    :lifecycle/calls :my.ns/add-metrics
    :lifecycle/doc "Instruments all tasks in a workflow with the example function 'add-metrics'"}])

You can supply as many sets of lifecycles as you want. They are invoked in the order that they are supplied in the vector, giving you a predictable sequence of calls. Be sure that all the keyword symbols and functions are required onto the classpath for the peer that will be executing them.

Tip
Example project: lifecycles

Backpressure

A common problem with streaming platforms is whether peers and tasks can exhibit backpressure to upstream tasks.

When Onyx’s internal messaging buffers overflow, the oldest segments in the buffer are discarded i.e. a sliding buffer. While this ensures that the freshest segments are likely to make it through the entire workflow, and be fully acked, input segments are still likely to be retried.

One important form of backpressure, is the :onyx/max-pending (Information Model), task parameter, which may be configured on input tasks. Input tasks will only produce new segments if there are fewer than max-pending pending (i.e. not fully acked) input segments.

One problem with max-pending as a form of backpressure, is that it doesn’t take into account the number of segments produced by the intermediate tasks, nor whether these segments are filling up the inbound buffers of the later tasks (due to slow processing, large numbers of produced segments, etc).

Onyx uses a simple scheme to allow backpressure to push back to upstream input tasks. When a virtual peer fills up past a high water mark, the peer writes a log message to say that its internal buffers are filling up (backpressure-on). If any peer currently allocated to a job has set backpressure-on, then all peers allocated to input tasks will stop reading from the input sources.

When a peers messaging buffer is reduced to below a low water mark, it writes a backpressure-off log message. If no peers allocated to a job are currently set to backpressure, then peers allocated to input tasks will resume reading from their input sources.

Refer to Peer Config for information regarding the default backpressure settings, and how to override them.

Windowing and Aggregation

This section discusses a feature called windowing. Windows allow you to group and accrue data into possibly overlapping buckets. Windows are intimately related to the Triggers feature. When you’re finished reading this section, head over to the Triggers chapter next.

Summary

Windowing splits up a possibly unbounded data set into finite, possibly overlapping portions. Windows allow us create aggregations over distinct portions of a stream, rather than stalling and waiting for the entire data data set to arrive. In Onyx, Windows strictly describe how data is accrued. When you want to do something with the windowed data, you use a Trigger. See the chapter on Triggers for more information. Onyx’s windowing mechanisms are strong enough to handle stream disorder. If your data arrives in an order that isn’t "logical" (for example, :event-time keys moving backwards in time), Onyx can sort out the appropriate buckets to put the data in.

Window Types

The topic of windows has been widely explored in the literature. There are different types of windows. Currently, Onyx supports Fixed, Sliding, Global, and Session windows. We will now explain the supported window types.

Fixed Windows

Fixed windows, sometimes called Tumbling windows, span a particular range and do not slide. That is, fixed windows never overlap one another. Consequently, a data point will fall into exactly one instance of a window (often called an extent in the literature). As it turns out, fixed windows are a special case of sliding windows where the range and slide values are equal. You can see a visual below of how this works, where the |--| drawings represent extents. Each window is of range 5. Time runs horizontally, while the right-hand side features the extent bound running vertically. The first extent captures all values between 0 and 4.99999…​

1, 5, 10, 15, 20, 25, 30, 35, 40
|--|                                [0  - 4]
   |--|                             [5  - 9]
      |---|                         [10 - 14]
          |---|                     [15 - 19]
              |---|                 [20 - 24]
                  |---|             [25 - 29]
                      |---|         [30 - 34]
                          |---|     [35 - 39]

Example:

{:window/id :collect-segments
 :window/task :identity
 :window/type :fixed
 :window/aggregation :onyx.windowing.aggregation/count
 :window/window-key :event-time
 :window/range [5 :minutes]}
Tip
Example project: fixed-windows

Sliding Windows

In contrast to fixed windows, sliding windows allow extents to overlap. When a sliding window is specified, we have to give it a range for which the window spans, and a slide value for how long to wait between spawning a new window extent. Every data point will fall into exactly range / slide number of window extents. We draw out what this looks like for a sliding window with range 15 and slide 5:

1, 5, 10, 15, 20, 25, 30, 35, 40
|---------|                         [0  - 14]
   |----------|                     [5  - 19]
      |-----------|                 [10 - 24]
          |-----------|             [15 - 29]
              |-----------|         [20 - 34]
                  |-----------|     [25 - 39]

Example:

{:window/id :collect-segments
 :window/task :identity
 :window/type :sliding
 :window/aggregation :onyx.windowing.aggregation/conj
 :window/window-key :event-time
 :window/range [15 :minutes]
 :window/slide [5 :minute]}
Tip
Example project: sliding-windows

Global Windows

Global windows are perhaps the easiest to understand. With global windows, there is exactly one window extent that match all data that enters it. This lets you capture events that span over an entire domain of time. Global windows are useful for modeling batch or timeless computations.

<- Negative Infinity                Positive Infinity ->
|-------------------------------------------------------|

Example:

{:window/id :collect-segments
 :window/task :identity
 :window/type :global
 :window/aggregation :onyx.windowing.aggregation/count
 :window/window-key :event-time}]
Tip
Example project: global-windows

Session Windows

Session windows are windows that dynamically resize their upper and lower bounds in reaction to incoming data. Sessions capture a time span of activity for a specific key, such as a user ID. If no activity occurs within a timeout gap, the session closes. If an event occurs within the bounds of a session, the window size is fused with the new event, and the session is extended by its timeout gap either in the forward or backward direction.

For example, if events with the same session key occured at 5, 7, and 20, and the session window used a timeout gap of 5, the windows would look like the following:

1, 5, 10, 15, 20, 25, 30, 35, 40
   |-|                           [5 - 7]
              |                  [20 - 20]

Windows that aren’t fused to anything are single points in time (see 20). If an event occurs before or after its timeout gap on the timeline, the two events fuse, as 5, and 7 do.

Example:

{:window/id :collect-segments
 :window/task :identity
 :window/type :session
 :window/aggregation :onyx.windowing.aggregation/conj
 :window/window-key :event-time
 :window/session-key :id
 :window/timeout-gap [5 :minutes]}]
Tip
Example project: session-windows

Units

Onyx allows you to specify range and slide values in different magnitudes of units, so long as the units can be coverted to the same unit in the end. For example, you can specify the range in minutes, and the slide in seconds. Any value that requires units takes a vector of two elements. The first element represents the value, and the second the unit. For example, window specifications denoting range and slide might look like:

{:window/range [1 :minute]
 :window/slide [30 :seconds]}

See the information model for all supported units. You can use a singular form (e.g. :minute) instead of the plural (e.g. :minutes) where it makes sense for readability.

Onyx is also capable of sliding by :elements. This is often referred to as "slide-by-tuple" in research. Onyx doesn’t require a time-based range and slide value. Any totally ordered value will work equivalently.

Aggregation

Windows allow you accrete data over time. Sometimes, you want to store all the data. Othertimes you want to incrementally compact the data. Window specifications must provide a :window/aggregation key. We’ll go over an example of every type of aggregation that Onyx supports.

:onyx.windowing.aggregation/conj

The :conj aggregation is the simplest. It collects segments for this window and retains them in a vector, unchanged.

{:window/id :collect-segments
 :window/task :identity
 :window/type :sliding
 :window/aggregation :onyx.windowing.aggregation/conj
 :window/window-key :event-time
 :window/range [30 :minutes]
 :window/slide [5 :minutes]
 :window/doc "Collects segments on a 30 minute window sliding every 5 minutes"}

:onyx.windowing.aggregation/count

The :onyx.windowing.aggregation/count operation counts the number of segments in the window.

{:window/id :count-segments
 :window/task :identity
 :window/type :fixed
 :window/aggregation :onyx.windowing.aggregation/count
 :window/window-key :event-time
 :window/range [1 :hour]
 :window/doc "Counts segments in one hour fixed windows"}

:onyx.windowing.aggregation/sum

The :sum operation adds the values of :age for all segments in the window.

{:window/id :sum-ages
 :window/task :identity
 :window/type :fixed
 :window/aggregation [:onyx.windowing.aggregation/sum :age]
 :window/window-key :event-time
 :window/range [1 :hour]
 :window/doc "Adds the :age key in all segments in 1 hour fixed windows"}

:onyx.windowing.aggregation/min

The :min operation retains the minimum value found for :age. An initial value must be supplied via :window/init.

{:window/id :min-age
 :window/task :identity
 :window/type :fixed
 :window/aggregation [:onyx.windowing.aggregation/min :age]
 :window/init 100
 :window/window-key :event-time
 :window/range [30 :minutes]
 :window/doc "Finds the minimum :age in 30 minute fixed windows, default is 100"}

:onyx.windowing.aggregation/max

The :max operation retains the maximum value found for :age. An initial value must be supplied via :window/init.

{:window/id :max-age
 :window/task :identity
 :window/type :fixed
 :window/aggregation [:onyx.windowing.aggregation/max :age]
 :window/init 0
 :window/window-key :event-time
 :window/range [30 :minutes]
 :window/doc "Finds the maximum :age in 30 minute fixed windows, default is 0"}

:onyx.windowing.aggregation/average

The :average operation maintains an average over :age. The state is maintained as a map with three keys - :n, the number of elements, :sum, the running sum, and :average, the running average.

{:window/id :average-age
 :window/task :identity
 :window/type :fixed
 :window/aggregation [:onyx.windowing.aggregation/average :age]
 :window/window-key :event-time
 :window/range [30 :minutes]
 :window/doc "Finds the average :age in 30 minute fixed windows, default is 0"}

:onyx.windowing.aggregation/collect-by-key

The :collect-by-key operation maintains a collection of all segments with a common key.

{:window/id :collect-members
 :window/task :identity
 :window/type :fixed
 :window/aggregation [:onyx.windowing.aggregation/collect-by-key :team]
 :window/window-key :event-time
 :window/range [30 :minutes]
 :window/doc "Collects all users on the same :team in 30 minute fixed windows"}

Grouping

All of the above aggregates have slightly different behavior when :onyx/group-by-key or :onyx/group-by-fn are specified on the catalog entry. Instead of the maintaining a scalar value in the aggregate, Onyx maintains a map. The keys of the map are the grouped values, and values of the map are normal scalar aggregates.

For example, if you had the catalog entry set to :onyx/group-by-key with value :name, and you used a window aggregate of :onyx.windowing.aggregation/count, and you sent through segments [{:name "john"} {:name "tiffany"} {:name "john"}], the aggregate map would look like {"john" 2 "tiffany" 1}.

Window Specification

See the Information Model chapter for an exact specification of what values the Window maps need to supply. Here we will describe what each of the keys mean.

key name description

:window/id

A unique identifier per window

:window/task

The workflow task over which the window operates

:window/type

Which type of window this is (fixed, sliding, etc)

:window/aggregation

The aggregation function to apply, as described above. If this operation is over a key, this is a vector, with the second element being the key.

:window/window-key

The key over which the range will be calculated

:window/range

The span of the window

:window/slide

The delay to wait to start a new window after the previous window

:window/init

The initial value required for some types of aggregation

:window/min-value

A strict mininum value that :window/window-key can ever be, default is 0.

:window/doc

An optional docstring explaining the window’s purpose

Triggers

In this section, we talk about Triggers. Triggers are a feature that interact with Windows. Windows capture and bucket data over time. Triggers let you release the captured data over a variety stimuli.

Summary

Windows capture data over time and place segments into discrete, possibly overlapping buckets. By itself, this is a relatively useless concept. In order to harness the information that has been captured and rolled up, we need to move it somewhere. Triggers let us interact with the state in each extent of a window.

Tip
Example project: aggregation

Trigger Types

Onyx ships a number of trigger implementations that can be used out of the box. Each trigger fires in response to a particular stimulous. All triggers implemented in Onyx core fire at task completion. We outline each here and show an example of each in action.

:timer

This trigger sleeps for a duration of :trigger/period. When it is done sleeping, the :trigger/sync function is invoked with its usual arguments. The trigger goes back to sleep and repeats itself.

{:trigger/window-id :collect-segments
 :trigger/refinement :onyx.refinements/discarding
 :trigger/on :onyx.triggers/timer
 :trigger/period [3 :seconds]
 :trigger/sync ::write-to-dynamo
 :trigger/doc "Writes state to DynamoDB every 5 seconds, discarding intermediate state"}

:segment

Trigger wakes up in reaction to a new segment being processed. Trigger only fires once every :trigger/threshold segments. When the threshold is exceeded, the count of new segments goes back to 0, and the looping proceeds again in the same manner.

{:trigger/window-id :collect-segments
 :trigger/refinement :onyx.refinements/accumulating
 :trigger/on :onyx.triggers/segment
 :trigger/fire-all-extents? true
 :trigger/threshold [5 :elements]
 :trigger/sync ::write-to-stdout
 :trigger/doc "Writes the window contents to stdout every 5 segments"}

:punctuation

Trigger wakes up in reaction to a new segment being processed. Trigger only fires if :trigger/pred evaluates to true. The signature of :trigger/pred is of arity-2: trigger, state-event. Punctuation triggers are often useful to send signals through that indicate that no more data will be coming through for a particular window of time.

{:trigger/window-id :collect-segments
 :trigger/refinement :onyx.refinements/discarding
 :trigger/on :onyx.triggers/punctuation
 :trigger/pred ::trigger-pred
 :trigger/sync ::write-to-stdout
 :trigger/doc "Writes the window contents to std out if :trigger/pred is true for this segment"}

:watermark

Trigger wakes up in reaction to a new segment being processed. Trigger only fires if the value of :window/window-key in the segment exceeds the upper-bound in the extent of an active window. This is a shortcut function for a punctuation trigger that fires when any piece of data has a time-based window key that is above another extent, effectively declaring that no more data for earlier windows will be arriving.

{:trigger/window-id :collect-segments
 :trigger/refinement :onyx.refinements/discarding
 :trigger/on :onyx.triggers/watermark
 :trigger/sync ::write-to-stdout
 :trigger/doc "Writes the window contents to stdout when this window's watermark has been exceeded"}

:percentile-watermark

Trigger wakes up in reaction to a new segment being processed. Trigger only fires if the value of :window/window-key in the segment exceeds the lower-bound plus the percentage of the range as indicated by :trigger/watermark-percentage, a double greater than 0 and less than 1. This is an alternative to :watermark that allows you to trigger on most of the data arriving, not necessarily every last bit.

{:trigger/window-id :collect-segments
 :trigger/refinement :onyx.refinements/discarding
 :trigger/on :onyx.triggers/percentile-watermark
 :trigger/watermark-percentage 0.95
 :trigger/sync ::write-to-stdout
 :trigger/doc "Writes the window contents to stdout when this window's watermark is exceeded by 95% of its range"}

Refinement Modes

A refinement mode allows you to articulate what should happen to the state of a window extent after a trigger has been invoked.

:accumulating

Setting :trigger/refinement to :onyx.refinements/accumulating means that the state of a window extent is maintained exactly as is after the trigger invocation. This is useful if you want to an answer to a query to "become more correct over time".

:discarding

Setting :trigger/refinement to :onyx.refinements/discarding means that the state of a window extent is set back to the value it was initialized with after the trigger invocation. You’d want to use this if the results from one periodic update bear no connection to subsequent updates.

Syncing

Onyx offers you the ultimate flexibility on what to do with your state during a trigger invocation. Set :trigger/sync to a fully qualified, namespaced keyword pointing to a function on the classpath at runtime. This function takes 5 arguments: The event map, the window map that this trigger is defined on, the trigger map, a state-event map, and the window state as an immutable value. Its return value is ignored.

This function is invoked when the trigger fires, and is used to do any arbitrary action with the window contents, such as sync them to a database. It is called once per window instance. In other words, if a fixed window exists with 5 instances, the firing of a Timer trigger will call the sync function 5 times. You can use lifecycles to supply any stateful connections necessary to sync your data. Supplied values from lifecycles will be available through the first parameter - the event map.

Trigger Specification

See the Information Model chapter for an exact specification of what values the Trigger maps need to supply. Here we will describe what each of the keys mean.

key name description

:trigger/window-id

A :window/id specified in the collection of windows.

:trigger/refinement

Fully qualified namespaced keyword for the mode of refinement e.g. :onyx.refinements/accumlating, :onyx.refinements/discarding.

:trigger/on

Fully qualified namespaced keyword for the trigger called to determine whether to fire as a reaction e.g. :onyx.triggers/segment.

:trigger/sync

Fully qualified namespaced keyword of a function to call with the state.

:trigger/fire-all-extents?

When true, fires every extent of a window in response to a trigger.

:trigger/doc

An optional docstring explaining the trigger’s purpose.

Aggregation & State Management

This section discusses state management and fault tolerance used in windowing/streaming joins.

Summary

Onyx provides the ability to perform updates to a state machine for segments which are calculated over windows. For example, a grouping task may accumulate incoming values for a number of keys over windows of 5 minutes. This feature is commonly used for aggregations, such as summing values, though it can be used to build more complex state machines.

State Example

;; Task definition
{:onyx/name :sum-all-ages
 :onyx/fn :clojure.core/identity
 :onyx/type :function
 :onyx/group-by-key :name
 :onyx/flux-policy :recover
 :onyx/min-peers 2
 :onyx/batch-size 20}

;; Window definition
{:window/id :sum-all-ages-window
 :window/task :sum-all-ages
 :window/type :global
 :window/aggregation [:your-sum-ns/sum :age]
 :window/window-key :event-time
 :window/range [1 :hour]
 :window/doc "Adds the :age key in all segments in 1 hour fixed windows"}

As segments are processed, an internal state within the calculated window is updated. In this case we are trying to sum the ages of the incoming segments.

Window aggregations are defined by a map containing the following keys:

Key Optional? Description

:aggregation/init

true

Fn (window) to initialise the state.

:aggregation/create-state-update

false

Fn (window, state, segment) to generate a serializable state machine update.

:aggregation/apply-state-update

false

Fn (window, state, entry) to apply state machine update entry to a state.

:aggregation/super-aggregation-fn

true

Fn (window, state-1, state-2) to combine two states in the case of two windows being merged.

In the :window/aggregation map in the :sum-all-ages window referenced above.

(ns your-sum-ns)

(defn sum-init-fn [window]
  0)

(defn sum-aggregation-fn [window state segment]
  ; k is :age
  (let [k (second (:window/aggregation window))]
    [:set-value (+ state (get segment k))]))

(defn sum-application-fn [window state [changelog-type value]]
  (case changelog-type
    :set-value value))

;; sum aggregation referenced in window definition.
(def sum
  {:aggregation/init sum-init-fn
   :aggregation/create-state-update sum-aggregation-fn
   :aggregation/apply-state-update sum-application-fn})

Let’s try processing some example segments using this aggregation:

[{:name "John" :age 49}
 {:name "Madeline" :age 55}
 {:name "Geoffrey" :age 14}]

Results in the following events:

Action Result

Initial state

0

Incoming segment

{:name "John" :age 49}

Changelog entry

[:set-value 49]

Applied to state

49

Incoming segment

{:name "Madeline" :age 55}

Changelog entry

[:set-value 104]

Applied to state

104

Incoming segment

{:name "Geoffrey" :age 14}

Changelog entry

[:set-value 128]

Applied to state

128

This state can be emitted via triggers or another mechanism. By describing changelog updates as a vector with a log command, such as :set-value aggregation function can emit multiple types of state transition if necessary.

Fault Tolerance

To allow for full recovery after peer crashes, the window state must be replicated somewhere. As state updates occur, Onyx publishes the stream of changelog updates to a replicated log.

After the changelog entry is written to the replicated log, the segment is acked, ensuring that a segment is only cleared from the input source after the update to window states it caused has been fully written to the log. When a peer crash occurs, a new peer will be assigned to the task, and this peer will play back all of the changelog entries, and apply them to the state, starting with the initial state. As the changelog updates are read back in the same order that they were written, the full state will be recovered. Partial updates ensure that only minimal update data is written for each segment processed, while remaining correct on peer failure.

Exactly Once Aggregation Updates

Exactly once aggregation updates are supported via Onyx’s filtering feature. When a task’s catalog has :onyx/uniqueness-key set, this key is looked up in the segment and used as an ID key to determine whether the segment has been seen before. If it has previously been processed, and state updates have been persisted, then the segment is not re-processed. This key is persisted to the state log transactionally with the window changelog updates, so that previously seen keys can be recovered in case of a peer failure.

See the section "Exactly Once Side-Effects" for discussion of why side-effects are impossible to achieve Exactly Once.

Considerations

In order to reduce memory consumption, uniqueness-key values are persisted to a local database, currently implemented with RocksDB. This database uses a bloom filter, and a memory cache, allowing Onyx to avoid hitting disk for most filter key checks.

In order to prevent unbounded increase in the size of the filter’s disk consumption, uniqueness-key values are bucketed based on recency, and the oldest bucket is expired as the newest is filled.

Several configuration parameters are available for the rocksdb based local filter. The most relevant of these for general configuration is :onyx.rocksdb.filter/num-ids-per-bucket, and :onyx.rocksdb.num-buckets, which are the size and the number of buckets referenced above.

Parameter Description Default

:onyx.rocksdb.filter/peer-block-cache-size

RocksDB block cache size in bytes. Larger caches reduce the chance that the peer will need to check for the prescence of a uniqueness key on disk. Defaults to 100MB.

104857600

:onyx.rocksdb.filter/bloom-filter-bits

Number of bloom filter bits to use per uniqueness key value

10

:onyx.rocksdb.filter/num-ids-per-bucket

Number of uniqueness key values that can exist in a RocksDB filter bucket.

10000000

:onyx.rocksdb.filter/num-buckets

Number of rotating filter buckets to use. Buckets are rotated every :onyx.rocksdb.filter/num-ids-per-bucket, with the oldest bucket being discarded if num-buckets already exist.

10

:onyx.rocksdb.filter/block-size

RocksDB block size. May worth being tuned depending on the size of your uniqueness-key values.

4096

:onyx.rocksdb.filter/compression

Whether to use compression in rocksdb filter. It is recommended that :none is used unless your uniqueness keys are large and compressible.

:none

:onyx.rocksdb.filter/base-dir

Temporary directory to persist uniqueness filtering data.

/tmp/rocksdb_filter

Exactly Once Side-Effects

Exactly once side-effects resulting from a segment being processed may occur, as exactly once side-effects are impossible to achieve. Onyx guarantees that a window state updates resulting from a segment are perfomed exactly once, however any side-effects that occur as a result of the segment being processed cannot be guaranteed to only occur once.

BookKeeper Implementation

State update changelog entries are persisted to BookKeeper, a replicated log server. An embedded BookKeeper server is included with Onyx. You can either use the embedded or run BookKeeper along side Onyx in a separate process.

BookKeeper ensures that changelog entries are replicated to multiple nodes, allowing for the recovery of windowing states upon the crash of a windowed task.

By default the the Onyx BookKeeper replication is striped to 3 BookKeeper instances (the quorum), and written to 3 instances (the ensemble).

Running the embedded BookKeeper server

The embedded BookKeeper server can be started via the onyx.api/start-env api call, with an env-config where :onyx.bookkeeper/server? is true.

When running on a single node, you may wish to use BookKeeper without starting the multiple instances of BookKeeper required to meet the ensemble and quorum requirements. In this case you may start a local quorum (3) of BookKeeper servers by setting :onyx.bookkeeper/local-quorum? to true.

Embedded BookKeeper Configuration Parameters

Parameter Description Default

:onyx.bookkeeper/server?

Bool to denote whether to startup a BookKeeper instance on this node, for use in persisting Onyx state information.

false

:onyx.bookkeeper/base-ledger-dir

Directory to store BookKeeper’s ledger in. It is recommended that this is altered to somewhere fast, preferably on a different disk to the BookKeeper jou ral

/tmp/bookkeeper_ledge

:onyx.bookkeeper/port

Port to startup this node’s BookKeeper instance on.

3196

:onyx.bookkeeper/local-quorum-ports

Ports to use for the local BookKeeper quorum.

[3196 3197 3198]

:onyx.bookkeeper/base-journal-dir

Directory to store BookKeeper’s journal in. It is recommended that this is altered to somewhere fast, preferably on a different disk to the BookKeeper le der.

/tmp/bookkeeper_journal

:onyx.bookkeeper/local-quorum?

Bool to denote whether to startup a full quorum of BookKeeper instances on this node. Important: for TEST purposes only.

false

State Log Compaction

It is recommended that the state changelog is periodically compacted. When compaction occurs, the current state is written to a new ledger and all previous ledgers are swapped for the new compacted state ledger.

Compaction can currently only be performed within a task lifecycle for the windowed task. Be careful to choose the condition (see YOUR-CONDITION in the example below, as compacting too often is likely expensive. Compacting once every X segments is reasonable good choice of condition.

(def compaction-lifecycle
    {:lifecycle/before-batch
     (fn [event lifecycle]
      (when YOUR-CONDITION
        (state-extensions/compact-log (:onyx.core/state-log event) event @(:onyx.core/window-state event)))
      {})})

BookKeeper Implementation Configuration

The BookKeeper state log implementation can be configured via the peer-config. Of particular note, is :onyx.bookkeeper/ledger-password which generally be changed to a more secure default.

Parameter Description Default

:onyx.bookkeeper/read-batch-size

Number of bookkeeper ledger entries to read at a time when recovering state. Effective batch read of state entries is write-batch-size * read-batch-si z.

50

:onyx.bookkeeper/ledger-id-written-back-off

Number of milliseconds to back off (sleep) after writing BookKeeper ledger id to the replica.

50

:onyx.bookkeeper/ledger-password

Password to use for Onyx state persisted to BookKeeper ledgers. Highly recommended this is changed on cluster wide basis.

INSECUREDEFAULTPASSWORD

:onyx.bookkeeper/client-throttle

Tunable write throttle for BookKeeper ledgers.

30000

:onyx.bookkeeper/write-buffer-size

Size of the buffer to which BookKeeper ledger writes are buffered via.

10000

:onyx.bookkeeper/client-timeout

BookKeeper client timeout.

60000

:onyx.bookkeeper/write-batch-size

Number of state persistence writes to batch into a single BookKeeper ledger entry.

20

:onyx.bookkeeper/ledger-quorum-size

The number of BookKeeper instances over which entries will be written to. For example, if you have an ledger-ensemble-size of 3, and a ledger-quorum-s ie of 2, the first write will be written to server1 and server2, the second write will be written to server2, and server3, etc.

3

:onyx.bookkeeper/ledger-ensemble-size

The number of BookKeeper instances over which entries will be striped. For example, if you have an ledger-ensemble-size of 3, and a ledger-quorum-size o 2, the first write will be written to server1 and server2, the second write will be written to server2, and server3, etc.

3

:onyx.bookkeeper/write-batch-timeout

Maximum amount of time to wait while batching BookKeeper writes, before writing the batch to BookKeeper. In case of a full batch read, timeout will no tbe hit.

50

Testing Onyx Jobs

In this chapter, we’ll cover what you need to know about testing your Onyx application code.

Overview

Onyx eschews customized abstractions for describing distributed programs. As a consequence, application code written on top of Onyx ends up being regular Clojure functions and data structures. Onyx remains out of the way, and you’re free to test your functions as you would any other Clojure program. It’s also useful to roll up your entire job and test it as it will run in production. This section is dedicated to giving you a set of idioms and tools for working with Onyx jobs during the development phase.

Automatic Resource Clean up

While it’s easy enough to run Onyx with an in-memory ZooKeeper instance (see the Environment chapter for how to do this), there are a host of development comforts that are missing when working on your Onyx code. One key pain point is the clean shutdown of resources on job completion or failure. Often when developing a new Onyx job, you’ll make lots of mistakes and your job will be killed. Before the next time you run your job, it’s a good idea to make sure your peers, peer group, and development environment all cleanly shut down. This can be a moderately tricky task at the repl where a Thread may be interrupted by a Control-C sequence. To this end, Onyx provides a onyx.test-helper namespace with a handy macro known as with-test-env.

with-test-env takes three parameters: a symbol to bind the development environment var to, a sequence of [number-of-peers-to-start, peer-config, env-config], and a body of expressions. This macro starts a development environment with the requested number of peers, runs the body of expressions, and cleanly shuts the development enivronment down when either the body of expressions completes, or an exception is thrown (included ThreadInterruptedException). You’ll notice that this is an anaphoric macro. The macro creates a development environment, then binds it to the user supplied symbol. Using this macro, you don’t need to worry about ports remaining open that should have been closed on shutdown. You can safely interrupt Onyx at any point in time during the job execution.

In practice, this looks something like the following:

(deftest my-onyx-job-test
  (let [id (java.util.UUID/randomUUID)
        config (load-config)
        env-config (assoc (:env-config config) :onyx/tenancy-id id)
        peer-config (assoc (:peer-config config) :onyx/tenancy-id id)]
    (with-test-env [test-env [3 env-config peer-config]]
      (let [catalog ...
            workflow ...
            lifecycles ...]
            (onyx.api/submit-job peer-config
                                   {:catalog catalog
                                    :workflow workflow
                                    :lifecycles lifecycles
                                    :task-scheduler :onyx.task-scheduler/balanced})
        (let [results (take-segments! ...)
              expected ...]
          (is (= expected results)))))))

Code Reloading

Another painful part of writing asynchronous code in general is the matter of reloading code without restarting your repl. Something that plays particularly well with Onyx is clojure.tools.namespace. A pattern that we like to use is to create a user.clj file for the developer with the following contents:

(ns user
  (:require [clojure.tools.namespace.repl :refer [refresh set-refresh-dirs]]))

(set-refresh-dirs "src" "test")

(defn init [])

(defn start [])

(defn stop [])

(defn go []
  (init)
  (start))

(defn reset []
  (stop)
  (refresh))

When you want to reload your code, invoke the (reset) function. You can supply any extra application specific code in init, start, and stop. Combining this pattern with the with-test-env macro, you should virtually never need to restart your repl while developing Onyx code.

In-Memory I/O

Production level Onyx jobs talk to input streams like Kafka, or databases like Postgres. It’s not always helpful to run those pieces of infrastructure while developing your job. Early on, we like to just focus on what the shape of the data will look like and you in-memory I/O with core.async instead of Kafka or what have you. There’s plenty of documentation on how to actually use the core.async plugin. The big question is - how does one most effectively use core.async for development, and more realistic I/O targets for staging and production?

Our approach leverages Onyx’s all-data job specification. We’ve found it helpful to isolate the parts of the catalog and lifecycles that will vary between different environments and use a "builder" pattern. We start with a "base job" - the pieces of the job that are invariants across all environments:

(defn base-job [mode onyx-id task-scheduler]
  (let [datomic-uri (my-env-helper/get-datomic-uri mode onyx-id)]
    {:workflow wf/parse-event-workflow
     :catalog (cat/build-parse-event-catalog datomic-uri)
     :flow-conditions (fcp/parser-flow-conditions :parse-log
                                                  [:write-parse-failures]
                                                  [:write-to-datomic])
     :lifecycles sl/base-parse-event-lifecycles
     :task-scheduler task-scheduler}))

The function which builds the base job takes a "mode", indicating what environment we should construct. We like to use keywords - things like :dev, :staging, :prod. Functions that receive these keywords are often multimethods which dispatch on mode, building the appropriate configuration files. In this example, we use the mode parameter to vary which Datomic URI we’re going to use in our catalog.

Next, we add in environment-specific code using a little utility function to update the base job that’s passed in as a parameter:

(defn add-metrics [job m-config]
  (my-fun-utilities/add-to-job job {:lifecycles
                                     (metrics/build-metrics-lifecycles
                                       (:riemann/host m-config)
                                       (:riemann/port m-config))}))

(defn add-logging [job]
  (my-fun-utilities/add-to-job job {:lifecycles sl/log-parse-event-lifecycles}))

Finally, we put them all together with a simple cond→:

(let [mode :dev
      log? true
      metrics? false]
  (cond-> (base-job mode onyx-id task-scheduler)
    log? (add-logging)
    metrics? (add-metrics m-config)))

It’s important to remember that we’re working with plain Clojure data here. The sky is the limit on how you can put the building blocks together!

Deployment

Onyx has no built-in mechanism for deployment. Rather, we let you deploy at your comfort. We’ll describe some approaches to doing this.

Deployment Style

Unlike Hadoop and Storm, Onyx does not have a built-in deployment feature. To deploy your application, you need to uberjar your program and place it on every node in your cluster. Start up the uberjar, passing it your shared Onyx ID and ZooKeeper address. Once it connects, you’ve successfully deployed!

We’ve chosen not to build in a deployment strategy because there are so many flexible approaches to handling deployment. S3, Docker, Mesos, Swarm, and Kubernetes are a few good choices. We’ll describe some of these strategies below.

Shared File System

Perhaps the most primitive deployment that you can use is a shared file system. Write a small script to SCP your uberjar to each of the nodes in your cluster and start the jar. You might use S3 for this, and a utility that allows you to manipulate a few SSH sessions in parallel. This is great for getting started on 3 nodes or so. You’ll want to use something a bit more elaborate as you go to production and get bigger, though.

Docker

We recommend packaging your uberjar into a Docker container. This is useful for locking in a specific Java version. Make sure you expose the ports that peers will be communicating on (this is configurable, see the peer configuration chapter). Once you have that down, you can upload your Docker image to DockerHub and use a simple script to pull down your Docker image and begin its execution. This is another step in the right direction, but read on to remove the scripting part of this task for real production systems.

Mesos and Marathon

Mesos and Marathon are a pair of applications that work together to manage your entire cluster of machines. I recommend deploying your Docker image onto Marathon, which will allow you to scale at runtime and avoid any scripting. Marathon ensures that if your Docker container goes down, it will be restarted. This is one of our favorite solutions for production scale, cluster-wide management at the moment.

Kubernetes

We are less familiar with Kubernetes as it’s a bit younger than Mesos and Marathon, but Kubernetes will deliver roughly the same functionality that Mesos and Marathon will.

Production Check List

A production check list is included in the Environment documentation.

Monitoring

When setting up an Onyx cluster in production, it’s helpful to know what Onyx itself is doing. Onyx exposes a set of callbacks that are triggered on certain actions.

Monitoring Hooks

When you’re taking Onyx to production, it’s not enough to know what your application-specific code is doing. You need to have insight into how Onyx is operating internally to be able to tune performance at an optimal level. Onyx allows you to register callbacks that are invoked when critical sections of code are executed in Onyx, returning a map that includes latency and data load size if appropriate.

Callback Specification

Callback functions that are given to Onyx for monitoring take exactly two parameters. The first parameter is the monitoring configuration, the second is a map. The return value of the function is ignored. The event names and keys in their corresponding maps are listed in a later section of this chapter, as well as a discussion about monitoring configuration. Here’s an example of a callback function:

(defn do-something [config {:keys [event latency bytes]}]
 ;; Write some metrics to Riemann or another type of services.
 (send-to-riemann event latency bytes))

Registering Callbacks

To register callbacks, create a map of event name key to callback function. This map must have a key :monitoring, mapped to keyword :custom. If you want to ignore all callbacks, you can instead supply :no-op. Monitoring is optional, so you can skip any monitoring code entirely if you don’t want to use this feature.

A registration example might look like:

(defn std-out [config event-map]
  ;; `config` is the `monitoring-config` var.
  (prn event-map))

(def monitoring-config
  {:monitoring :custom
   :zookeeper-write-log-entry std-out
   :zookeeper-read-log-entry std-out
   :zookeeper-write-catalog std-out
   :zookeeper-write-workflow std-out
   :zookeeper-write-flow-conditions std-out
   :zookeeper-force-write-chunk std-out
   :zookeeper-read-catalog std-out
   :zookeeper-read-lifecycles std-out
   :zookeeper-gc-log-entry std-out})

;; Pass the monitoring config as a second parameter to the `start-peer-group` function.
(def peer-group (onyx.api/start-peer-group (assoc peer-config :monitoring-config monitoring-config)))

Monitoring Events

This is the list of all monitoring events that you can register hooks for. The keys listed are present in the map that is passed to the callback function. The names of the events should readily identify what has taken place to trigger the callback.

Event Name Keys

:zookeeper-write-log-entry

:event, :latency, :bytes

:zookeeper-read-log-entry

:event, :latency, :bytes

:zookeeper-write-catalog

:event, :latency, :bytes

:zookeeper-write-workflow

:event, :latency, :bytes

:zookeeper-write-flow-conditions

:event, :latency, :bytes

:zookeeper-write-lifecycles

:event, :latency, :bytes

:zookeeper-write-windows

:event, :latency, :bytes

:zookeeper-write-triggers

:event, :latency, :bytes

:zookeeper-write-job-metadata

:event, :latency, :bytes

:zookeeper-write-task

:event, :latency, :bytes

:zookeeper-write-job-hash

:event, :latency, :bytes

:zookeeper-write-chunk

:event, :latency, :bytes

:zookeeper-write-job-scheduler

:event, :latency, :bytes

:zookeeper-write-messaging

:event, :latency, :bytes

:zookeeper-write-exception

:event, :latency, :bytes

:zookeeper-force-write-chunk

:event, :latency, :bytes

:zookeeper-write-origin

:event, :latency, :bytes

:zookeeper-read-catalog

:event, :latency

:zookeeper-read-workflow

:event, :latency

:zookeeper-read-flow-conditions

:event, :latency

:zookeeper-read-lifecycles

:event, :latency

:zookeeper-read-windows

:event, :latency

:zookeeper-read-triggers

:event, :latency

:zookeeper-read-job-metadata

:event, :latency

:zookeeper-read-task

:event, :latency

:zookeeper-read-job-hash

:event, :latency

:zookeeper-read-chunk

:event, :latency

:zookeeper-read-origin

:event, :latency

:zookeeper-read-job-scheduler

:event, :latency

:zookeeper-read-messaging

:event, :latency

:zookeeper-read-exception

:event, :latency

:zookeeper-gc-log-entry

:event, :latency, :position

:peer-retry-segment

:event, :latency

:peer-complete-message

:event, :latency

:peer-gc-peer-link

:event

:peer-backpressure-on

:event, :id

:peer-backpressure-off

:event, :id

:group-prepare-join

:event, :id

:group-notify-join

:event, :id

:group-accept-join

:event, :id

:peer-send-bytes

:event, :id, :bytes

Performance Tuning

This chapter details a few tips for getting your Onyx cluster to run as fast as possible.

Onyx

  • Use Clojure 1.7+. 1.7 and 1.8 have some considerable performance enhancements compared to 1.6.

  • SETUP METRICS. We cannot stress this point enough. Please read the guide at onyx-metrics to get started.

  • Set the timbre log level to elide logging calls in production. This should be done at compile time, i.e. when AOT compiling, or when running via leinigen. You can do using the environment variable: TIMBRE_LOG_LEVEL=info command_to_start_your_peers_or_compile_AOT.

  • Turn off clojure assertions. Onyx uses assertions liberally, to find errors early, however these do have an associated cost. Use :global-vars {assert false} in your leiningen profile, or (set! assert false) in your system bootup namespace.

  • Use JVM_OPTS="-server" in production.

  • Use JVM_OPTS="-XX:+UseG1GC". The G1GC has more predictable performance that can improve latency, though may decrease throughput by a small amount. It is strongly recommended you use this GC, unless you have a reason not to.

  • Check that you do not use reflection in any of the code running in your onyx tasks critical paths e.g. in onyx/fn, before/after-batch lifecycles, aggregations, etc. You can check for reflection by using :global-vars {warn-on-reflection false} in your lein profile.

  • Tweak the :onyx/batch-size for the job’s tasks. A batch size of 20-100 segments is a reasonable start that will amortise the cost of onyx’s task lifecycle vs latency.

  • For small segments, batch multiple segments into a single segment, and treat each new segment as a rolled up batch.

  • Tweak the batch timeout in each catalog entry to trade off increased latency for higher throughput. Counterintuitively, note that increased batch sizes can actually decrease average case latency, as improved throughput can decrease queuing effects.

  • The peer-config option :onyx.messaging/allow-short-circuit? (Peer Config), should be set to true in production.

  • Profile your code locally using Java Flight Recorder / Mission Control. The easiest way to get started is to create a benchmark test, and the following JVM_OPTS -XX:+UnlockCommercialFeatures -XX:+FlightRecorder" -XX:+UnlockDiagnosticVMOptions -XX:StartFlightRecording=duration=1080s,filename=localrecording.jfr. This will save a file to "localrecording.jfr, which you can open in mission control (which can be started via the command jmc. You will need Oracle’s JVM to use this functionality.

  • Start roughly 1 virtual peer per core per machine. When using Aeron messaging, cores = virtual peers + subscribers is a good guideline. This recommendation is a good starting point, however may not hold true when some virtual peers are largely idle, or spend much of their time I/O blocked.

  • Use a custom compression scheme, rather than Nippy. You can configure custom compression/decompression functions via the peer configuration.

  • Increase the number of acker peers through the peer configuration as your cluster gets larger

  • Tune the number of Aeron subscriber threads, if serialization is a large proportion of work performed in tasks.

ZooKeeper

  • Put the ZooKeeper journal on its own physical volume

Zookeeper tends to not get a huge amount of traffic, so this probably won’t offer a huge performance boost. It’s helpful if you’re trying to make your processing latency as predictable as possible, though.

Messaging

Aeron

Ensure you disable the embedded media driver, and instead use an independent media driver (see Media Driver)

When testing performance with a single node using the Aeron messaging layer, connection short circuiting may cause very misleading results.

The peer-config option :onyx.messaging/allow-short-circuit? (Peer Config), should be be set to false for realistic performance testing when only a single node is available for testing. Ensure this option is set to true when operating in production.

Please refer to the Aeron messaging section for general discussion of the Aeron messaging implementation and its characterstics.

Environment

In this chapter, we’ll discuss what you need to set up a develop and production environment.

Development Environment

Dependencies

  • Java 8+

  • Clojure 1.7+

Explanation

One of the primary design goals of Onyx is to make the development environment as close as possible to production - without making the developer run a lot of services locally. A development environment in Onyx merely needs Clojure 1.7+ to operate. A ZooKeeper server is spun up in memory via Curator, so you don’t need to install ZooKeeper locally if you don’t want to.

Production Environment

Dependencies

  • Java 8+

  • Clojure 1.7+

  • ZooKeeper 3.4.5+

Multi-node & Production Checklist

Congratulations! You’re going to production, or at least testing your Onyx jobs with a multi-node setup.

We strongly recommend you run through this checklist before you do so, as it will likely save you a lot of time.

  • Ensure your JVM is running with JVM opts -server Performance will be greatly decreased if you do not run Onyx via Java without at least -server JVM opts.

  • Disable embedded ZooKeeper: when onyx.api/start-env is called with a config where :zookeeper/server? true, it will start an embedded ZooKeeper. :zookeeper/server? should be set to false in production.

  • Setup production ZooKeeper: A full ZooKeeper ensemble should be used in lieu of the testing ZooKeeper embedded in Onyx.

  • Increase maximum ZK connections Onyx establishes a large number of ZooKeeper connections, in which case you will see an exception like the following: WARN org.apache.zookeeper.server.NIOServerCnxn: Too many connections from /127.0.0.1 - max is 10. Increase the number of connections in zoo.cfg, via maxClientCnxns. This should be set to a number moderately larger than the number of virtual peers that you will start.

  • Configure ZooKeeper address to point an ensemble: :zookeeper/address should be set in your peer-config e.g. :zookeeper/address "server1:2181,server2:2181,server3:2181".

  • Ensure all nodes are using the same :onyx/tenancy-id: :onyx/tenancy-id in the peer-config is used to denote which cluster a virtual peer should join. If all your nodes do not use the same :onyx/tenancy-id, they will not be a part of the same cluster and will not run the same jobs. Any jobs submitted a cluster must also use the same :onyx/tenancy-id to ensure that cluster runs the job.

  • Do not use core async tasks: switch all input or output tasks from core.async as it is a not a suitable medium for multi-node use and will result in many issues when used in this way. The Kafka plugin is one recommended alternative.

  • Test on a single node with without short circuiting: when :onyx.messaging/allow-short-circuit? is true, Aeron messaging will be short circuited completely. To test messaging on a local mode as if it’s in production, set :onyx.messaging/allow-short-circuit? false.

  • Ensure short circuiting is enabled in production: short circuiting improves performance by locally messaging where possible. Ensure :onyx.messaging/allow-short-circuit? true is set in the peer-config on production.

  • Set messaging bind address: the messaging layer must be bound to the network interface used by peers to communicate. To do so, set :onyx.messaging/bind-addr in peer-config to a string defining the interface’s IP. On AWS, this IP can easily be obtained via (slurp "http://169.254.169.254/latest/meta-data/local-ipv4").

  • Is your bind address external facing?: If your bind address is something other than the one accessible to your other peers (e.g. docker, without net=host), then you will need to define an external address to advertise. This can be set via :onyx.messaging/external-addr in peer-config.

  • Open UDP ports for Aeron: Aeron requires the port defined in :onyx.messaging/peer-port to be open for UDP traffic.

  • Setup an external Aeron Media Driver: If messaging performance is a factor, it is recommended that the Aeron Media Driver is run out of process. First, disable the embedded driver by setting :onyx.messaging.aeron/embedded-driver? false. An example out of process media driver is included in lib-onyx. This media driver can be started via lein run -m, or via an uberjar, each by referencing the correct namespace, which contains a main entry point. Ensure that the media driver is started with JVM opts -server.

  • Setup metrics: when in production, it is essential that you are able to measure retried messages, input message complete latency, throughput and batch latency. Setup Onyx to use onyx-metrics. We recommend at very least using the timbre logging plugin, which is easy to setup.

ZooKeeper

Environment Launch of In-Memory ZooKeeper

To launch an in-memory ZooKeeper instance, add :zookeeper/server? true to the environment options. Also, specify :zookeeper.server/port <my port> so that Curator knows what port to start running the server on.

If your deployment throws an exception and doesn’t shut down ZooKeeper, it will remain open. Firing up the environment again will cause a port collision, so be sure to restart your repl in that case.

Peer Connection to In-Memory ZooKeeper

Add :zookeeper/address "127.0.0.1:<my port>" to the peer options as usual. In-memory Zookeeper is completely opaque to the peer.

Example

Here’s an example of using ZooKeeper in-memory, with some non-ZooKeeper required parameters elided.

(def env-config
  {:zookeeper/address "127.0.0.1:2182"
   :zookeeper/server? true
   :zookeeper.server/port 2182
   :onyx/tenancy-id id})

(def peer-opts
  {:zookeeper/address "127.0.0.1:2182"
   :onyx/tenancy-id id})

Networking / Firewall

Messaging requires the UDP port to be open for port set :onyx.messaging/peer-port.

All peers require the ability to connect to the ZooKeeper instances over TCP.

Explanation

Running a ZooKeeper cluster is a requirement for a lot of fault tolerant systems. See this link for getting set up. I won’t go into detail since this is a particularly common set up. We recommend using Exhibitor to manage clustered ZooKeeper.

Example

Notice that all we’re doing is extending the address string to include more host:port pairs. This uses the standard ZooKeeper connection string, so you can use authentication here too if you need it.

(def peer-opts
  {...
   :zookeeper/address "10.132.8.150:2181,10.132.8.151:2181,10.132.8.152:2181"
   ...})

Peer Configuration

The chapter describes the all options available to configure the virtual peers and development environment.

Base Configuration

key name

type

:onyx/tenancy-id

any

:zookeeper/address

string

Environment Only

key name type optional?

:zookeeper/server?

boolean

Yes

:zookeeper.server/port

int

Yes

:onyx.bookkeeper/server?

boolean

Yes

:onyx.bookkeeper/local-quorum?

boolean

Yes

:onyx.bookkeeper/local-quorum-ports

[int]

Yes

:onyx.bookkeeper/port

int

Yes

:onyx.bookkeeper/base-journal-dir

string

Yes

:onyx.bookkeeper/base-ledger-dir

string

Yes

Peer Only

Base Configuration

key name type default

:onyx.peer/inbox-capacity

int

1000

:onyx.peer/outbox-capacity

int

1000

:onyx.peer/retry-start-interval

int

2000

:onyx.peer/join-failure-back-off

int

250

:onyx.peer/drained-back-off

int

400

:onyx.peer/peer-not-ready-back-off

int

2000

:onyx.peer/job-not-ready-back-off

int

500

:onyx.peer/fn-params

map

{}

:onyx.peer/tags

vector

[]

:onyx.peer/backpressure-check-interval

int

10

:onyx.peer/backpressure-low-water-pct

int

30

:onyx.peer/backpressure-high-water-pct

int

60

:onyx.zookeeper/backoff-base-sleep-time-ms

int

1000

:onyx.zookeeper/backoff-max-sleep-time-ms

int

30000

:onyx.zookeeper/backoff-max-retries

int

5

:onyx.messaging/inbound-buffer-size

int

20000

:onyx.messaging/completion-buffer-size

int

1000

:onyx.messaging/release-ch-buffer-size

int

10000

:onyx.messaging/retry-ch-buffer-size

int

10000

:onyx.messaging/peer-link-gc-interval

int

90000

:onyx.messaging/peer-link-idle-timeout

int

60000

:onyx.messaging/ack-daemon-timeout

int

60000

:onyx.messaging/ack-daemon-clear-interval

int

15000

:onyx.messaging/decompress-fn

function

onyx.compression.nippy/decompress

:onyx.messaging/compress-fn

function

onyx.compression.nippy/compress

:onyx.messaging/impl

keyword

:aeron

:onyx.messaging/bind-addr

string

nil

:onyx.messaging/peer-port

int

nil

:onyx.messaging/allow-short-circuit?

boolean

true

:onyx.messaging.aeron/embedded-driver?

boolean

true

:onyx.messaging.aeron/subscriber-count

int

2

:onyx.messaging.aeron/write-buffer-size

int

1000

:onyx.messaging.aeron/poll-idle-strategy

keyword

:high-restart-latency

:onyx.messaging.aeron/offer-idle-strategy

keyword

:high-restart-latency

:onyx.peer/inbox-capacity

Maximum number of messages to try to prefetch and store in the inbox, since reading from the log happens asynchronously.

:onyx.peer/outbox-capacity

Maximum number of messages to buffer in the outbox for writing, since writing to the log happens asynchronously.

:onyx.peer/retry-start-interval

Number of ms to wait before trying to reboot a virtual peer after failure.

:onyx.peer/drained-back-off

Number of ms to wait before trying to complete the job if all input tasks have been exhausted. Completing the job may not succeed if the cluster configuration is being shifted around.

:onyx:onyx.peer/peer-not-ready-back-off

Number of ms to back off and wait before retrying the call to start-task? lifecycle hook if it returns false.

:onyx:onyx.peer/job-not-ready-back-off

Number of ms to back off and wait before trying to discover configuration needed to start the subscription after discovery failure.

:onyx.peer/join-failure-back-off

Number of ms to wait before trying to rejoin the cluster after a previous join attempt has aborted.

:onyx.peer/fn-params

A map of keywords to vectors. Keywords represent task names, vectors represent the first parameters to apply to the function represented by the task. For example, {:add [42]} for task :add will call the function underlying :add with (f 42 <segment>).

:onyx.peer/tags

Tags which denote the capabilities of this peer in terms of user-defined functionality. Must be specified as a vector of keywords. This is used in combination with :onyx/required-tags in the catalog to force tasks to run on certain sets of machines.

:onyx.zookeeper/backoff-base-sleep-time-ms

Initial amount of time to wait between ZooKeeper connection retries

:onyx.zookeeper/backoff-max-sleep-time-ms

Maximum amount of time in ms to sleep on each retry

:onyx.zookeeper/backoff-max-retries

Maximum number of times to retry connecting to ZooKeeper

:onyx.peer/backpressure-low-water-pct

Percentage of messaging inbound-buffer-size that constitutes a low water mark for backpressure purposes.

:onyx.peer/backpressure-high-water-pct

Percentage of messaging inbound-buffer-size that constitutes a high water mark for backpressure purposes.

:onyx.peer/backpressure-check-interval

Number of ms between checking whether the virtual peer should notify the cluster of backpressure-on/backpressure-off.

:onyx.messaging/inbound-buffer-size

Number of messages to buffer in the core.async channel for received segments.

:onyx.messaging/completion-buffer-size

Number of messages to buffer in the core.async channel for completing messages on an input task.

:onyx.messaging/release-ch-buffer-size

Number of messages to buffer in the core.async channel for released completed messages.

:onyx.messaging/retry-ch-buffer-size

Number of messages to buffer in the core.async channel for retrying timed-out messages.

The interval in milliseconds to wait between closing idle peer links.

The maximum amount of time that a peer link can be idle (not looked up in the state atom for usage) before it is eligible to be closed. The connection will be reopened from scratch the next time it is needed.

:onyx.messaging/ack-daemon-timeout

Number of milliseconds that an ack value can go without being updates on a daemon before it is eligible to time out.

:onyx.messaging/ack-daemon-clear-interval

Number of milliseconds to wait for process to periodically clear out ack-vals that have timed out in the daemon.

:onyx.messaging/decompress-fn

The Clojure function to use for messaging decompression. Receives one argument - a byte array. Must return the decompressed value of the byte array.

:onyx.messaging/compress-fn

The Clojure function to use for messaging compression. Receives one argument - a sequence of segments. Must return a byte array representing the segment seq.

:onyx.messaging/impl

The messaging protocol to use for peer-to-peer communication.

:onyx.messaging/bind-addr

An IP address to bind the peer to for messaging. Defaults to nil, binds to it’s external IP to the result of calling http://checkip.amazonaws.com.

:onyx.messaging/peer-port

The port that peers should use to communicate.

:onyx.messaging/allow-short-circuit?

A boolean denoting whether to allow virtual peers to short circuit networked messaging when colocated with the other virtual peer. Short circuiting allows for direct transfer of messages to a virtual peer’s internal buffers, which improves performance where possible. This configuration option is primarily for use in perfomance testing, as peers will not generally be able to short circuit messaging after scaling to many nodes.

:onyx.messaging.aeron/embedded-driver?

A boolean denoting whether an Aeron media driver should be started up with the environment. See Aeron Media Driver for an example for how to start the media driver externally.

:onyx.messaging.aeron/subscriber-count

The number of Aeron subscriber threads that receive messages for the peer-group. As peer-groups are generally configured per-node (machine), this setting can bottleneck receive performance if many virtual peers are used per-node, or are receiving and/or de-serializing large volumes of data. A good guidline is is num cores = num virtual peers + num subscribers, assuming virtual peers are generally being fully utilised.

:onyx.messaging.aeron/write-buffer-size

Size of the write queue for the Aeron publication. Writes to this queue will currently block once full.

:onyx.messaging.aeron/poll-idle-strategy

The Aeron idle strategy to use between when polling for new messages. Currently, two choices :high-restart-latency and :low-restart-latency can be chosen. low-restart-latency may result in lower latency message, at the cost of higher CPU usage or potentially reduced throughput.

:onyx.messaging.aeron/offer-idle-strategy

The Aeron idle strategy to use between when offering messages to another peer. Currently, two choices :high-restart-latency and :low-restart-latency can be chosen. low-restart-latency may result in lower latency message, at the cost of higher CPU usage or potentially reduced throughput.

Peer Full Example

(def peer-opts
  {:onyx/tenancy-id "df146eb8-fd6e-4903-847e-9e748ca08021"
   :zookeeper/address "127.0.0.1:2181"
   :onyx.peer/inbox-capacity 2000
   :onyx.peer/outbox-capacity 2000
   :onyx.peer/retry-start-interval 4000
   :onyx.peer/join-failure-back-off 500
   :onyx.peer/drained-back-off 400
   :onyx.peer/peer-not-ready-back-off 5000
   :onyx.peer/job-not-ready-back-off 1000
   :onyx.peer/fn-params {:add [42]}
   :onyx.peer/zookeeper-timeout 10000
   :onyx.messaging/completion-buffer-size 2000
   :onyx.messaging/release-ch-buffer-size 50000
   :onyx.messaging/retry-ch-buffer-size 100000
   :onyx.messaging/ack-daemon-timeout 90000
   :onyx.messaging/ack-daemon-clear-interval 15000
   :onyx.messaging/decompress-fn onyx.compression.nippy/decompress
   :onyx.messaging/compress-fn onyx.compression.nippy/compress
   :onyx.messaging/impl :aeron
   :onyx.messaging/bind-addr "localhost"
   :onyx.messaging/peer-port-range [50000 60000]
   :onyx.messaging/peer-ports [45000 45002 42008]})

Information Model

The information model is now described in the Cheat Sheet. The information model is also described in a nested map.

Scheduling

Onyx offers fine-grained control of how many peers are allocated to particular jobs and tasks. This section outlines how to use the built-in schedulers.

Allocating Peers to Jobs and Tasks

In a masterless design, there is no single entity that assigns tasks to peers. Instead, peers need to contend for tasks to execute as jobs are submitted to Onyx. Conversely, as peers are added to the cluster, the peers must "shift" to distribute the workload across the cluster. Onyx ships out-of-the-box job and task allocation policies. End users can change the levels of fairness that each job gets with respect to cluster power. And remember, one virtual peer executes at most one task.

Job Schedulers

Each running Onyx instance is configured with exactly one job scheduler. The purpose of the job scheduler is to coordinate which jobs peers are allowed to volunteer to execute. There are a few different kinds of schedulers, listed below. To use each, configure the Peer Options map with key :onyx.peer/job-scheduler to the value specified below.

Greedy Job Scheduler

The Greedy job scheduler allocates all peers to each job in the order that it was submitted. For example, suppose you had 100 virtual peers and you submitted two jobs - job A and job B. With a Greedy scheduler, all 100 peers would be allocated to job A. If job A completes, all 100 peers will then execute tasks for job B. This probably isn’t desirable when you’re running streaming workflows, since they theoretically never end.

To use this scheduler, set key :onyx.peer/job-scheduler to :onyx.job-scheduler/greedy in the Peer Options.

Peer Addition

In the event that a peer joins the cluster while the Greedy job scheduler is running, that new peer will be allocated to the current job that is being run greedily.

Peer Removal

In the event that a peer leaves the cluster while the Greedy job scheduler is running, no peers will be shifted off of the job that is greedily running.

Job Addition

If a job is submitted while this scheduler is running, no peers will be allocated to this job. The only exception to this rule is if no jobs are currently running. In this case, all peers will be allocated to this job.

Job Removal

If a job is completed or otherwise canceled, all of the peers executed that task will move to the job that was submitted after this job.

Balanced Robin Job Scheduler

The Balanced job scheduler allocates peers in a rotating fashion to jobs that were submitted. For example, suppose that you had 100 virtual peers (virtual peer 1, virtual peer 2, …​ virtual peer 100) and you submitted two jobs - job A and job B. With a Balanced scheduler, both jobs will end up with 50 virtual peers allocated to each. This scheduler begins allocating by selecting the first job submitted.

To use this scheduler, set key :onyx.peer/job-scheduler to :onyx.job-scheduler/balanced in the Peer Options.

Peer Addition

In the event that a peer joins the cluster while the Balanced scheduler is running, that new peer will be allocated to the job that most evenly balances the cluster according to a the number of jobs divided by the number of peers. If there is a tie, the new peer is added to the earliest submitted job.

Peer Removal

In the event that a peer leaves the cluster while the Balanced scheduler is running, the peers across all jobs will be rebalanced to evenly distribute the workflow.

Job Addition

If a job is submitted while this scheduler is running, the entire cluster will be rebalanced. For example, if job A has all 100 peers executing its task, and job B is submitted, 50 peers will move from job A to job B.

Job Removal

If a job is completed or otherwise canceled while this scheduler is running, the entire cluster will be rebalanced. For example, if job A, B, and C had 20 peers executing each of its tasks (60 peers total), and job C finished, job A would gain 10 peers, and job B would gain 10 peers.

Percentage Job Scheduler

The Percentage job scheduler allows jobs to be submitted with a percentage value. The percentage value indicates what percentage of the cluster will be allocated to this job. The use case for this scheduler is for when you have a static number of jobs and a varying number of peers. For example, if you have 2 jobs - A and B, you’d give each of this percentage values - say 70% and 30%, respectively. If you had 100 virtual peers running, 70 would be allocated to A, and 30 to B. If you then added 100 more peers to the cluster, job A would be allocated 140 peers, and job B 60. This dynamically scaling is a big step forward over statically configuring slots, which is normal in ecosystems like Hadoop and Storm.

If there aren’t enough peers to satisfy the percentage values of all the jobs, this scheduler will allocate with priority to jobs with the highest percentage value. When percentage values are equal, the earliest submitted job will get priority. In the event that jobs are submitted, and the total percentage value exceeds 100%, the earliest submitted jobs that do not exceed 100% will receive peers. Jobs that go beyond it will not receive peers. For example, if you submitted jobs A, B, and C with 70%, 30%, and 20% respectively, jobs A and B would receive peers, and C will not be allocated any peers until either A or B completes.

If the total percentages of all submitted jobs doesn’t sum up to 100%, the job with the highest percentage value will receive the extra peers. When percentage values are equal, the earliest submitted job will get priority.

If the algorithm determines that any job should receive a number of peers that is less than the minimum number it needs to execute, that job receives no peers.

To use this scheduler, set key :onyx.peer/job-scheduler to :onyx.job-scheduler/percentage in the Peer Options.

Peer Addition

In the event that a peer joins the cluster while the Percentage job scheduler is running, the entire cluster will rebalance.

Peer Removal

In the event that a peer leaves the cluster while the Percentage job scheduler is running, the entire cluster will rebalance.

Job Addition

If a job is submitted while this scheduler is running, the entire cluster will be rebalanced.

Job Removal

If a job is completed or otherwise canceled while this scheduler is running, the entire cluster will be rebalanced.

Task Schedulers

Each Onyx job is configured with exactly one task scheduler. The task scheduler is specified at the time of calling submit-job. The purpose of the task scheduler is to control the order in which available peers are allocated to which tasks. There are a few different Task Scheduler implementations, listed below. To use each, call onyx.api/submit-job. The second argument of this function is a map. Supply a :task-scheduler key and map it to the value specified below.

Balanced Task Scheduler

The Balanced Scheduler takes a topological sort of the workflow for a specific job. As peers become available, this scheduler assigns tasks to peers in a rotating order. For example, if a workflow has a topological sort of tasks A, B, C, and D, this scheduler assigns each peer to tasks A, B, C, D, A, B, C, D, …​ and so on.

To use, set :task-scheduler in submit-job to :onyx.task-scheduler/balanced.

Peer Removal

If a peer fails, or is otherwise removed from the cluster, the Task scheduler defers to the Job scheduler to rebalance the cluster. If a new peer is added to this task as a result of a peer failing in another job, it is assigned the next task in the balanced sequence.

Max Peer Parameter

With the Balanced Task Scheduler, each entry in the catalog can specify a key of :onyx/max-peers with an integer value > 0. When this key is set, Onyx will never assign that task more than that number of peers. Balanced will simply skip the task for allocation when more peers are available, and continue assigning balanced to other tasks.

Percentage Task Scheduler

The Percentage Scheduler takes a set of tasks, all of which must be assigned a percentage value (:onyx/percentage) in the corresponding catalog entries. The percentage values must add up to 100 or less. Percent values may be integers between 1 and 99, inclusive. This schedule will allocate peers for this job in proportions to the specified tasks. As more or less peers join the cluster, allocations will automatically scale. For example, if a job has tasks A, B, and C with 70%, 20%, and 30% specified as their percentages, and there are 10 peers, task A receives 7 peers, B 2 peers, and C 1 peer.

This scheduler handles corner cases (fractions of peers) in the same way as the Percentage Job Scheduler. See that documentation for a full description.

To use, set :task-scheduler in submit-job to :onyx.task-scheduler/percentage.

Peer Removal

If a peer fails, or is otherwise removed from the cluster, the Task scheduler rebalances all the peers for even distribution.

Colocation Task Scheduler

The Colocation Schedule takes all of the tasks for a job and, if possible, assigns them to the peers on a single physical machine represented by the same peer group. If a job has 4 tasks and the cluster is one machine with 5 peers, 4 peers will become active. If that machine had 8 peers, all 8 would become active as this schedule operates in peer chunks that are divisible by the task size. If more machines are capable of executing the entire job, they will also be used.

This scheduler is useful for dramatically increasing performance of jobs where the latency is bound by the network of transmitting data across tasks. Using this scheduler with peer short circuiting will ensure that segments are never serialized and never cross the network between tasks (with the exception of grouping tasks). Onyx’s usual fault tolerancy mechanisms are still used to ensure that data is processed in the presence of machine failure.

To use, set :task-scheduler in submit-job to :onyx.task-scheduler/colcocated.

To use colocation, but to disable the scheduler’s affinity to always send segments to a local peer, set :onyx.task-scheduler.colocated/only-send-local? to false in the peer config. This is desirable when optimal performance depends on the uniformity of tasks being evenly assigned to machines in your cluster, but strictly local execution is not helpful for performance.

Peer Addition

If a peer is added to the cluster and its machine is capable of executing all the tasks for this job, the entire machine will be used - provided that it falls into the pool of peers elligible to execute this job, per the job scheduler’s perogative.

Peer Removal

If a peer is removed, all the peers associated with this job’s tasks for this chunk of peers will stop executing their tasks.

Tags

It’s often the case that a set of machines in your cluster are privileged in some way. Perhaps they are running special hardware, or they live in a specific data center, or they have a license to use a proprietary database. Sometimes, you’ll have Onyx jobs that require tasks to run on a predetermined set of machines. Tags are a feature that let peers denote "capabilities". Tasks may declare which tags peers must have in order to be selected to execute them.

Peers

To declare a peer as having special capabilities, use a vector of keywords in the Peer Configuration under key :onyx.peer/tags. For example, if you wanted to declare that a peer has a license for its JVM to communicate with Datomic, you might add this to your Peer Configuation:

{...
 :onyx/tenancy-id "my-cluster"
 :onyx.peer/tags [:datomic]
 ...
}

You can specify multiple tags. The default is no tags ([]), in which case this peer can execute any tasks that do not require tags.

Tasks

Now that we have a means for expressing which peers can do which kinds of things, we’ll need a way to express which tasks require which capabilities. We do this in the catalog. Any task can use the key :onyx/required-tags with a vector of keywords as a value. Any peer that executes this task is garunteed to have :onyx/required-tags as a subset of its :onyx.peer/tags.

For example, to declare that task :read-datoms must be executed by a peer that can talk to Datomic, you might write:

[{:onyx/name :read-datoms
  :onyx/plugin :onyx.plugin.datomic/read-datoms
  :onyx/type :input
  :onyx/medium :datomic
  :onyx/required-tags [:datomic] ;; <- Add this!
  :datomic/uri db-uri
  ...
  :onyx/batch-size batch-size
  :onyx/doc "Reads a sequence of datoms from the d/datoms API"}
 ...
]

Event Subscription

Onyx’s log-based design provides open-ended access to react to all coordination events. This section describes how to tap into these notifications.

Explanation

Onyx uses an internal log to totally order all coordination events across nodes in the cluster. This log is maintained as a directory of sequentially ordered znodes in ZooKeeper. It’s often of interest to watch the events as they are written to the log. For instance, you may want to know when a particular task is completed, or when a peer joins or leaves the cluster. You can use the log subscriber to do just that.

Subscribing to the Log

The following is a complete example to pretty print all events as they are written to the log. You need to provide the ZooKeeper address, Onyx ID, and shared job scheduler in the peer config. The subscriber will automatically track recover from sequentially reading errors in the case that a garbage collection is triggered, deleting log entries in its path.

(def peer-config
  {:zookeeper/address "127.0.0.1:2181"
   :onyx/tenancy-id onyx-id
   :onyx.peer/job-scheduler :onyx.job-scheduler/round-robin})

(def ch (chan 100))

(def subscription (onyx.api/subscribe-to-log peer-config ch))

(def log (:log (:env subscription)))

;; Loops forever
(loop [replica (:replica subscription)]
  (let [entry (<!! ch)
        new-replica (onyx.extensions/apply-log-entry entry replica)]
    (clojure.pprint/pprint new-replica)
    (recur new-replica)))

(onyx.api/shutdown-env (:env subscription))

Some example output from a test, printing the log position, log entry content, and the replica as-of that log entry:

====
Log Entry #0
Entry is {:message-id 0, :fn :prepare-join-cluster, :args {:joiner #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"}}
Replica is:
{:peer-state {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #1
Entry is {:message-id 1, :fn :prepare-join-cluster, :args {:joiner #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"}}
Replica is:
{:prepared
 {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"},
 :peer-state {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #2
Entry is {:message-id 2, :fn :notify-join-cluster, :args {:observer #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf", :subject #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"}}
Replica is:
{:accepted
 {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"},
 :prepared {},
 :peer-state {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #3
Entry is {:message-id 3, :fn :accept-join-cluster, :args {:observer #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf", :subject #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577", :accepted-observer #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577", :accepted-joiner #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"}}
Replica is:
{:pairs
 {#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"},
 :accepted {},
 :prepared {},
 :peer-state
 {#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #4
Entry is {:message-id 4, :fn :prepare-join-cluster, :args {:joiner #uuid "010a1688-47ff-4055-8da5-1f02247351e1"}}
Replica is:
{:pairs
 {#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"},
 :accepted {},
 :prepared
 {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :peer-state
 {#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #5
Entry is {:message-id 5, :fn :notify-join-cluster, :args {:observer #uuid "010a1688-47ff-4055-8da5-1f02247351e1", :subject #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"}}
Replica is:
{:pairs
 {#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"},
 :accepted
 {#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :prepared {},
 :peer-state
 {#uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #6
Entry is {:message-id 6, :fn :accept-join-cluster, :args {:observer #uuid "010a1688-47ff-4055-8da5-1f02247351e1", :subject #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf", :accepted-observer #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577", :accepted-joiner #uuid "010a1688-47ff-4055-8da5-1f02247351e1"}}
Replica is:
{:pairs
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :accepted {},
 :prepared {},
 :peer-state
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #7
Entry is {:message-id 7, :fn :prepare-join-cluster, :args {:joiner #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"}}
Replica is:
{:pairs
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :accepted {},
 :prepared
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"},
 :peer-state
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #8
Entry is {:message-id 8, :fn :notify-join-cluster, :args {:observer #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c", :subject #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"}}
Replica is:
{:pairs
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :accepted
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"},
 :prepared {},
 :peer-state
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #9
Entry is {:message-id 9, :fn :accept-join-cluster, :args {:observer #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c", :subject #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf", :accepted-observer #uuid "010a1688-47ff-4055-8da5-1f02247351e1", :accepted-joiner #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"}}
Replica is:
{:pairs
 {#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c",
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :accepted {},
 :prepared {},
 :peer-state
 {#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c" :idle,
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #10
Entry is {:message-id 10, :fn :prepare-join-cluster, :args {:joiner #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"}}
Replica is:
{:pairs
 {#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c",
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :accepted {},
 :prepared
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"},
 :peer-state
 {#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c" :idle,
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #11
Entry is {:message-id 11, :fn :notify-join-cluster, :args {:observer #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4", :subject #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"}}
Replica is:
{:pairs
 {#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c",
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :accepted
 {#uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"},
 :prepared {},
 :peer-state
 {#uuid "e6c35131-f4d9-432d-8915-e8616851bb1c" :idle,
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #12
Entry is {:message-id 12, :fn :accept-join-cluster, :args {:observer #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4", :subject #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c", :accepted-observer #uuid "010a1688-47ff-4055-8da5-1f02247351e1", :accepted-joiner #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"}}
Replica is:
{:pairs
 {#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c",
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4",
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :accepted {},
 :prepared {},
 :peer-state
 {#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4" :idle,
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c" :idle,
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
  #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"
],
 :job-scheduler :onyx.job-scheduler/greedy}
====
Log Entry #13
Entry is {:message-id 13, :fn :submit-job, :args {:id #uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7", :tasks [#uuid "ce13205e-937b-4af6-9aa9-d5149b31fb2c" #uuid "948f8595-3a0a-4318-b128-91c1d22c0158" #uuid "fb86b977-d668-4c98-abaa-80ee0d29663a"], :task-scheduler :onyx.task-scheduler/round-robin, :saturation Infinity, :task-saturation {#uuid "ce13205e-937b-4af6-9aa9-d5149b31fb2c" Infinity, #uuid "948f8595-3a0a-4318-b128-91c1d22c0158" Infinity, #uuid "fb86b977-d668-4c98-abaa-80ee0d29663a" Infinity}}}
Replica is:
{:job-scheduler :onyx.job-scheduler/greedy,
 :saturation {#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7" Infinity},
 :peers
 [#uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
  #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"],
 :accepted {},
 :jobs [#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7"],
 :tasks
 {#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7"
  [#uuid "ce13205e-937b-4af6-9aa9-d5149b31fb2c"
   #uuid "948f8595-3a0a-4318-b128-91c1d22c0158"
   #uuid "fb86b977-d668-4c98-abaa-80ee0d29663a"]},
 :pairs
 {#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4"
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c",
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c"
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf",
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"
  #uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4",
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf"
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577",
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577"

  #uuid "010a1688-47ff-4055-8da5-1f02247351e1"},
 :allocations {#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7" {}},
 :prepared {},
 :peer-state
 {#uuid "bf8fd5fc-30fd-424c-af6a-0b32568581a4" :idle,
  #uuid "e6c35131-f4d9-432d-8915-e8616851bb1c" :idle,
  #uuid "010a1688-47ff-4055-8da5-1f02247351e1" :idle,
  #uuid "7ad37c45-ce67-4fd4-8850-f3ec58ede0bf" :idle,
  #uuid "b7e5d564-02a3-46d3-863f-c4a2bac7e577" :idle},
 :task-schedulers
 {#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7"
  :onyx.task-scheduler/round-robin},
 :task-saturation
 {#uuid "b784ebb4-356f-4e16-8eac-60e051d69ab7"
  {#uuid "ce13205e-937b-4af6-9aa9-d5149b31fb2c" Infinity,
   #uuid "948f8595-3a0a-4318-b128-91c1d22c0158" Infinity,
   #uuid "fb86b977-d668-4c98-abaa-80ee0d29663a" Infinity}}}
====
...

Plugins

Plugins serve as an abstract to compose mechanisms for getting data in and out of Onyx. See the README.md of the project for a list of official Onyx plugins, or keep reading to roll your own.

Interfaces

In order to implement a plugin, one or more protocols need to be implemented from the Pipeline Extensions API. Reader plugins will implement PipelineInput and Pipeline. Writer plugins will implement Pipeline. See the docstrings for instructions on implementation.

Templates

To help move past the boilerplate of creating new plugins, use Leiningen with onyx-plugin to generate a template.

Coordination within Plugins

Often virtual peers allocated to a task may need to coordinate with respect to allocating work. For example, a Kafka reader task may need to assign partitions to different peers on the same topic. The Onyx mechanism for coordinating peers is the log. The Onyx log is extensible by plugins, by implementing several extensions defmethods.

For example:

(ns your.plugin.log-commands
  (:require [onyx.extensions :as extensions]))


(defmethod extensions/apply-log-entry :yourplugin/coordination-type
  [{:keys [args]} replica]
  replica)

(defmethod extensions/replica-diff :yourplugin/coordination-type
  [{:keys [args]} old new]
  {})


(defmethod extensions/reactions :yourplugin/coordination-type
  [{:keys [args]} old new diff peer-args]
  [])

(defmethod extensions/fire-side-effects! :yourplugin/coordination-type
  [{:keys [args]} old new diff {:keys [monitoring] :as state}]
  state)

When modifying the replica, please assoc-in into replica under [:task-metadata job-id task-id], so that it will be cleaned up when the job is completed or killed.

onyx-core-async

Onyx plugin providing read and write facilities for Clojure core.async.

Installation

This plugin is included with Onyx. You do not need to add it as a separate dependency.

In your peer boot-up namespace:

(:require [onyx.plugin.core-async])

Functions

read-from-chan

Catalog entry:

{:onyx/name :in
 :onyx/plugin :onyx.plugin.core-async/input
 :onyx/type :input
 :onyx/medium :core.async
 :onyx/batch-size batch-size
 :onyx/max-peers 1
 :onyx/doc "Reads segments from a core.async channel"}

Lifecycle entries:

[{:lifecycle/task :your-task-name
  :lifecycle/calls :my.ns/in-calls}
 {:lifecycle/task :your-task-name
  :lifecycle/calls :onyx.plugin.core-async/reader-calls}]

There’s a little extra baggage with core.async because you need a reference to the channel. Make sure that my.ns/in-calls is a map that references a function to inject the channel in:

(def in-chan (chan capacity))

(defn inject-in-ch [event lifecycle]
  {:core.async/chan in-chan})

(def in-calls
  {:lifecycle/before-task-start inject-in-ch})

write-to-chan

Catalog entry:

{:onyx/name :out
 :onyx/plugin :onyx.plugin.core-async/output
 :onyx/type :output
 :onyx/medium :core.async
 :onyx/batch-size batch-size
 :onyx/max-peers 1
 :onyx/doc "Writes segments to a core.async channel"}

Lifecycle entries:

[{:lifecycle/task :your-task-name
  :lifecycle/calls :my.ns/out-calls}
 {:lifecycle/task :your-task-name
  :lifecycle/calls :onyx.plugin.core-async/writer-calls}]

Again, as with read-from-chan, there’s a little extra to do since core.async has some exceptional behavior compared to other plugins:

(def out-chan (chan capacity))

(defn inject-out-ch [event lifecycle]
  {:core.async/chan out-chan})

(def out-calls
  {:lifecycle/before-task-start inject-out-ch})

Utility Functions

take-segments!

This additional function is provided as a utility for removing segments from a channel until :done is found. After :done is encountered, all prior segments, including :done, are returned in a seq.

Logging

This chapter details how to inspect and modify the logs that Onyx produces.

Timbre

Onyx uses Timbre for logging.

By default, all Onyx output is logged to a file called onyx.log in the same directory as where the peer or coordinator jar is executing. The logging configuration can be overridden completely, see below.

Onyx’s default logging configuration writes all WARN and FATAL messages to standard out, also.

Overriding the log file name and path

In order to override the log file location add :onyx.log/file to both the environment config sent into start-env as well as the peer config sent into start-peer-group.

Both relative and absolute paths are supported by Timbre.

Overriding the Timbre log config

Similarly, to override the full Timbre log config map, construct the Timbre configuration and add it to both the environment and peer config maps under the :onyx.log/config key. Note that the onyx.log/config map will be merged with the existing Timbre configuration rather than replacing it completely. In practice this means that extra configuration must be sent in to, for example, disable appenders that are enabled by default. See the examples below.

Examples

The following example simply changes the output file.

(let [log-config {:onyx.log/file "/var/log/onyx.log"}
      peer-config (merge my-peer-config log-config)
      env-config (merge my-env-config log-config)]
  (onyx.api/start-env env-config)
  (onyx.api/start-peer-group peer-config)

  ;; ...
  )

This example uses Timbre to redirect Onyx logs into the regular Java logging system using the log-config library.

(require '[com.palletops.log-config.timbre.tools-logging :as tl])
(let [log-config {:onyx.log/config {:appenders
                                    {:spit {:enabled? false}
                                     :standard-out {:enabled? false}
                                     :rotor {:enabled? false}
                                     :jl (tl/make-tools-logging-appender
                                          {:enabled? true
                                           :fmt-output-opts {:nofonts? true}})}
                                    :min-level :trace}}
      peer-config (merge my-peer-config log-config)
      env-config (merge my-env-config log-config)]
  (onyx/start-env env-config)
  (onyx/start-peer-group peer-config)

  ;; ...
  )

If you already have timbre logging configured somewhere in your code base, you can specify :onyx.log/config {} to ensure your settings remain unchanged. In effect, this simply merges in the empty map into whatever settings you may have already specified for logging.

(let [log-config {:onyx.log/config {}}
      peer-config (merge my-peer-config log-config)
      env-config (merge my-env-config log-config)]
  (onyx.api/start-env env-config)
  (onyx.api/start-peer-group peer-config)

  ;; ...
  )

See the Timbre example configuration for more information on valid values for the :onyx.log/config map.

Examples

Examples Project

See the Examples Project for a set of self-contained demonstrations of specific Onyx functionality.

Frequently Asked Questions

My job’s tasks never start

When your job’s tasks don’t start, you won’t see any messages in onyx.log on the peer that read something like:

Job fb2f2b80-2c5a-41b6-93c8-df35bffe6915 {:job-name :click-stream} - Task da5795a6-bd4c-41a1-843d-dda84cf5c615 :inc - Peer 2b35433a-b935-43b7-881b-4f4ec16672cc - Warming up task lifecycle {:id =uuid "da5795a6-bd4c-41a1-843d-dda84cf5c615", :name :inc, :egress-ids {:out =uuid "2c210c3e-6411-45fb-85a7-6cc733118215"}}

Instead, the log would show peers starting, but not doing anything. This looks like something:

Starting Virtual Peer 111dcc44-7f53-41a4-8548-047803e8d441

Resolutions

Do you have enough virtual peers provisioned?

When a job is submitted to Onyx, it will only begin when there are enough virtual peers to sustain its execution. How many are enough? That depends. One virtual peer may work on at most one task at any given time. By default, each task in your workflow requires one available virtual peer to run it. If a grouping task is used, or if you set :onyx/min-peers on the task’s catalog entry, the number may be greater than 1. You can statically determine how many peers a job needs for execution with this simple helper function available in Onyx core.

As an example, if you submit a job with 10 tasks in its workflow, you need at least 10 available virtual peers for your job to begin running. If you only have 8, nothing will happen. If you have more than 10 (assuming that 10 is truly the minimum that you need), Onyx may attempt to dedicate more resources to your job beyond 10 peers, depending on which scheduler you used You can control the number of virtual peers that you’re running by modifying the parameters to onyx.api/start-peers.

We don’t emit a warning that your cluster is underprovisioned in terms of number of peers because its masterless design makes this difficult to asynchronously detect. This has been discussed in an issue. Onyx is designed to handle under and over provisioning as part of its normal flow, so this is not considered an error condition.

See the Scheduling chapter of the User Guide for a full explanation.

Are you using the Greedy job scheduler?

You’ve hit this case if you have more than one job running concurrently. When you start your peers, you specified a job scheduler through the configuration key :onyx.peer/job-scheduler. If you specified :onyx.job-scheduler/greedy, you’ve asked that Onyx dedicate all cluster resources to the first submitted job until that job is completed. Then the job is completed, all resources will be assigned to the next job available, and so on. If you want more than one job running at a time, consider using the Balanced (onyx.job-scheduler/balanced) or Percentage (onyx.job-scheduler/percentage) job schedulers. See the Scheduling chapter of this User Guide for a full explanation.

Peers repeatedly echo that they’re warming up

If you see something like the following repeatedly being written to onyx.log, you’re in the right place:

Job fb2f2b80-2c5a-41b6-93c8-df35bffe6915 {:job-name :click-stream} - Task 2c210c3e-6411-45fb-85a7-6cc733118215 :out - Peer 3b2c6667-8f41-47a9-ba6b-f97c81ade828 - Peer chose not to start the task yet. Backing off and retrying...

What’s happening here is that your job’s tasks have started execution. Peers have been assigned tasks, and are beginning to get things in order to perform work on them. Before a peer can truly begin running a task, it must do three things:

  1. Call all lifecycle hooks registered under lifecycle/start-task? sequentially, accruing their results

  2. Call the constructor for the input or output plugin, if this is an input or output task.

  3. Emit a signal back to ZooKeeper indicating that this peer has opened up all of its sockets to receiving incoming communication from other peers.

The results from the first step are a sequence of boolean values designating whether the peer is allowed to start this task. If any of the start-task? lifecycle calls return false, the peer sleeps for a short period to back off, then performs step 1 again. This process continues until all lifecycle calls return true. Read the Lifecycles chapter of this User Guide to learn why it’s useful to be repeatedly try and back-off from task launching.

The second step will only occur after step 1 successfully completes - that is, all lifecycle functions return true. The reason you see "Backing off and retrying…​" in the logs is because step 1 is being repeated again.

Resolutions

Do all your :lifecycle/start-task? functions eventually return

true?

If any of your lifecycle hooks for :lifecycle/start-task? return false or nil, you’ll want to change them to eventually return true based upon some condition.

Are the connections that your plugin opens valid?

As per step 2, a plugin’s constructor will be called to open any relevant stateful connections, such as a database or socket connection. Many connection calls, such as a JDBC SQL, will block for prolonged periods of time before failing, unless otherwise configured. If the task that appears to be stuck is an input or output task, this is likely the cause. You may want to reconfigure your initial connections to fail faster to make it more obvious as to what’s happening.

None of my messages are being processed

If you’re onyx.log shows messages that read as follows, your job’s tasks have successfully started:

Job fb2f2b80-2c5a-41b6-93c8-df35bffe6915 {:job-name :click-stream} - Task da5795a6-bd4c-41a1-843d-dda84cf5c615 :inc - Peer 2b35433a-b935-43b7-881b-4f4ec16672cc - Enough peers are active, starting the task

If you suspect that messages are not being processed, it heavily depends on the input plugin that you’re using.

Resolutions

Try using :onyx/fn to log all incoming messages

One thing that you can do for extra visibility is to log all incoming messages from input tasks. This is inadvisable for production, but can be useful for temporary debugging. You can specify an :onyx/fn transformation function to any task, including inputs and outputs. It can be useful to specify a debug function on your input tasks to see which messages are entering the system. Be sure that this function returns the original segment! For example, you can define a function:

(defn spy [segment]
  (println "Read from input:" segment)
  segment)

Then add :onyx/fn ::spy to your input catalog entries.

Has your job been killed?

Unless otherwise configured by your Lifecycles, if any user-level code throws an exception, the job will be killed and is no longer elligible for execution. Check onyx.log on all your peers to ensure that no exceptions were thrown. If this were the case, you’d see messages lower in the log reading:

Job fb2f2b80-2c5a-41b6-93c8-df35bffe6915 {:job-name :click-stream} - Task 1e83e005-3e2d-4307-907b-a3d66e3aa293 :in - Peer 111dcc44-7f53-41a4-8548-047803e8d441 - Stopping task lifecycle
Are you using the Kafka input plugin?

If you’re using the Kafka input plugin, make sure that you’re reading from a reasonable starting offset of the topic. If you’ve set :kafka/force-reset? to true in the catalog entry, and you’ve also set :kafka/offset-reset to :largest, you’ve instructed Onyx to begin reading messages from the end of the topic. Until you place more messages into the topic, Onyx will sit idle waiting for more input. The starting offset for each input task using Kafka is echoed out in onyx.log.

The same messages are being replayed multiple times

Message replay happens when a mesage enters Onyx from an input source, gets processed, and is seen again at a later point in time. Onyx replays messages for fault tolerance when it suspects that failure of some sort has occurred. You can read about how message replay is implemented, and why it is exists, in the Architecture chapter of this User Guide.

There are many reasons why a message may need to be replayed (every possible failure scenario), so we will limit our discussion to controlling replay frequency. See the performance tuning sections of this document for more context about what value is appropriate to set for the replay frequency.

Resolutions

Is your :onyx/pending-timeout too low?

Messages are replayed from the input source if they do not complete their route through the cluster within a particular period of time. This period is controlled by the :onyx/pending-timeout parameter to the catalog entry, and it’s default is 60 seconds. You can read about its specifics in the Cheatsheet. You should set this value high enough such that any segment taking longer than this value to complete is highly likely to have encountered a failure scenario.

My program starts running, but then it stalls

Programs that begin healthy by processing messages and then stall are out typically indicative of user-level code problems. We outline a few common cases here.

Resolutions

Does onyx.log have any exceptions in it?

Most exceptions will kill the job in question. If you are simply monitoring progress by reading from an output data source through Onyx, you should check all of the peer onyx.log files for exceptions that may have killed the job.

Are any user-level functions blocking?

Any implementations of :onyx/fn that are blocking will halt progress of all other segments that are directly lined up behind it. Ensure that user level functions finish up in a timely manner.

Are messages being replayed?

To get started, see the full section on how and why messages are being replayed. In short, messages will be replayed in 60 seconds if they are not completed. You may be experiencing initial success, followed by a runtime error that is causing temporarily lost segments before replay.

Are you using a core.async output plugin?

If you’re using a core.async output plugin writing to a channel that will block writes when the buffer is full, you have run enough messages to put onto the channel such that core.async writes are now blocking, and hence stalling Onyx.

Are your peer hosts and ports advertised correctly?

Ensure that the host and port that the peer advertises to the rest of the cluster for incoming connections is correct. If it is incorrect, only tasks that are colocated on the same machine will have a chance of working. Remember that Onyx uses UDP as its port, so make sure that your security settings are allowing traffic to run through that protocol.

The host is configured via the :onyx.messaging/bind-addr key, and the port is configured via the :onyx.messaging/peer-port key.

Peer fails to start, and throws

java.io.IOException: No space left on device

This exception commonly occurs when running Onyx inside of a Docker container. Aeron requires more shared memory than the container allocates by default. You can solve this problem by starting your container with a larger amount of shared memory by specifying --shm-size on Docker >= 1.10.

Aeron Mediadriver crashes the JVM with SIGBUS

This exception can occur when Aeron does not have enough shared memory. Increase the amount of shared memory that is set as described above.

Peer fails to start, and throws

org.apache.bookkeeper.bookie.BookieException$InvalidCookieException: Cookie

This exception occurs due to a bug in BookKeeper reconnection to ZooKeeper before it’s ephemeral node expires. We are currently surveying our own workarounds until this is patched, but for now the thing to do is to delete /tmp/bookkeeper_journal and /tmp/bookkeeper_ledger on the host. Restart the peer, and all will be well.

Peer fails to start, and throws

java.lang.IllegalStateException: aeron cnc file version not understood

This exception occurs when Aeron’s version is upgraded or downgraded between incompatible versions. The exception will also provide a path on the OS to some Aeron files. Shutdown the peer, delete that directory, then restart the peer.

Peer fails to start, and throws

Failed to connect to the Media Driver - is it currently running?

This message is thrown when the peer tries to start, but can’t engage Aeron in its local environment. Aeron can be run in embedded mode by switching :onyx.messaging.aeron/embedded-driver? to true, or by running it out of process on the peer machine, which is the recommended production setting. If you’re running it out of process, ensure that it didn’t go down when you encounter this message. You should run Aeron through a process monitoring tool such as monit when running it out of process.

Peer fails to start, and throws

uk.co.real_logic.aeron.driver.exceptions.ActiveDriverException: active driver detected

You have encountered the following exception:

uk.co.real_logic.aeron.driver.exceptions.ActiveDriverException: active driver detected
  clojure.lang.ExceptionInfo: Error in component :messaging-group in system onyx.system.OnyxPeerGroup calling ='com.stuartsierra.component/start

This is because you have started your peer-group twice without shutting it down. Alternatively, you may be using :onyx.messaging.aeron/embedded-driver? true in your peer-group and starting a media driver externally. Only one media driver can be started at a time.

Application fails to build uberjar, throw

'java.lang.unsupporteclassversionerror: uk.co.real_logic/aeron/Aeron$context unsupported major.minor version 52.0'

You have encountered the following exception:

java.lang.unsupporteclassversionerror: uk.co.real_logic/aeron/Aeron$context unsupported major.minor version 52.0

This is because you are trying to build/run an Onyx app with a JRE version lower than 1.8. Onyx supports Java 1.8 only.

Peer fails to start, and throws

org.apache.bookkeeper.proto.WriteEntryProcessorV3: Error writing entry:X to ledger:Y

You have encountered the following exception:

2015-12-16 16:59:35 ERROR org.apache.bookkeeper.proto.WriteEntryProcessorV3: Error writing entry:0 to ledger:2
org.apache.bookkeeper.bookie.Bookie$NoLedgerException: Ledger 2 not found

Your ZooKeeper directory has been cleared out of information that points to the BookKeeper servers, and the two processes can’t sync up. This can be fixed by removing the data directory from the BookKeeper servers and ZooKeeper servers.

My program begins running, but throws

No implementation of method: :read-char of protocol: ='clojure.tools.reader.reader-types/Reader found for class

You’ll encounter this exception when your :onyx/fn returns something that is not EDN and Nippy serializable, which is required to send it over the network. Ensure that return values from :onyx/fn return either a map, or a vector of maps. All values within must be EDN serializable.

What does Onyx use internally for compression by default?

Unless otherwise overridden in the Peer Pipeline API, Onyx will use Nippy. This can be override by setting the peer configuration with :onyx.messaging/compress-fn and :onyx.messaging/decompress-fn. See the Information Model documentation for more information.

How can I filter segments from being output from my tasks?

Use [Flow Conditions](\{\{ "/flow-conditions.html" | prepend: page.dir | prepend: site.baseurl }}) or return an empty vector from your :onyx/fn.

Can I return more than one segment from a function?

Return a vector of maps from :onyx/fn instead of a map. All maps at the top level of the vector will be unrolled and pushed downstream.

Should I be worried about user-level KeeperException in ZooKeeper

logs?

You should monitor these, however KeeperErrorCode = NodeExists are probably fine:

2015-11-05 15:12:51,332 [myid:] - INFO  [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@645] - Got user-level KeeperException when processing sessionid:0x150d67d0cd10003 type:create cxid:0xa zxid:0x50 txntype:-1 reqpath:n/a Error Path:/onyx/0e14715d-51b9-4e2b-af68-d5292f276afc/windows Error:KeeperErrorCode = NodeExists for /onyx/0e14715d-51b9-4e2b-af68-d5292f276afc/windows

This is a peer just trying to recreate a ZooKeeper path that was already created by another peer, and it can be safely ignored.

How should I benchmark on a single machine?

Definitely turn off messaging short circuiting, as messaging short circuiting will improve performance in a way that is unrealistic for multi-node use. Remember to turn messaging short circuiting back on for production use, as it does improve performance overall.