I’m happy to announce the release of Onyx 0.8.4. This release includes our next-generation scheduling component, largely backed by an independent library named BtrPlace. Today we’ll be introducing a new feature called Task Colocation, an advanced scheduling technique that uses information about the physical layout of your cluster to optimize the allocation of work units. First, let’s talk about why something like this is warranted.

A Distributed System

Much of the challenge in designing a distributed computation platform, such as Onyx, comes from giving users the ability to feel as if the programs they construct are locally executable - when in fact, these programs must be transparently deployed to a cluster of machines. The designer needs to walk a fine line with the API when creating this abstraction. If the interfaces are too opaque, the user will no leverage diagnosing real problems in production that arise as a result of distribution and stress. If the interfaces are too transparent, the developer spends markedly less time focusing on their application, and is consequently less productive. Real production deployments of high throughput streaming applications need to give strong considersation to what’s happening on the bare metal. Specifically, we’re going to consider one of the properties of Onyx that affects latency, and consequentially overall performance. To do this we need to go back to the basics.

In the literature, a distributed system is classically defined as a set of processes with two properties. First, processes may communicate only by sending and receiving messages to one another. Second, processes do not share state. The important thing to remember when using Onyx are that processes (more specifically peers) emit data (more specifically segments) to one another in between tasks using the network. A job with more tasks implies more network activity.

For many jobs, it’s advantageous to spread the execution of tasks for a job across different machines. It’s also typically the case that tasks are CPU bound. That is, the user-level function that underpins the task accounts for the majority of the execution time as data moves throughout the workflow. In these cases, it is most beneficial for machines to execute a heterogenous mixture of tasks to deal with the variance. For example, below we show the configuration of 3 machines in an Onyx cluster. Each machine has 3 peers - making a total of 9 peers. Two jobs with 3 tasks each are deployed. Job 1 is allocated 6 peers, and job 2 is allocated 3 peers. The dispersion of tasks for a job across a variety of machines is usually desirable.

Shuffled

The tasks of some jobs, however, are sufficiently fast to make the networking opererations that transfer segments from peer to peer across the network the bottleneck. When a segment moves from one peer to the next, the segments are first serialized. By default, we serialize serialize data to the Nippy format. Next, the segments are transmitted using Aeron, a very high performance message transport library. If you’ve ever spent time around the Hadoop ecosystem, you’ve almost certainly heard of certain workloads being slowed down by serialization. Spark has made drastic performance improvements by fusing execution units together, which avoids serialization and writing to disk altogether at the cost of longer recovery during a failure scenario. Onyx now includes Task Colocation to achieve similar performance gains.

Task Colocation

Starting in Onyx 0.8.4, you can now set your :task-scheduler to :onyx.task-scheduler/colocated on a per-job basis. The Colocation Scheduler takes all of the tasks for a job and, if possible, assigns them to the peers on a single physical machine. The result is that segments are never serialized, and therefore never cross the network. Jobs that are network bound will see a dramatic gain in performance by switching to the colocation scheduler. In effect, this is analogous to “transducers at scale”.

More specifically, if a job has M tasks, the scheduler will choose as many machines with large enough capacity (at least M peers) to run the job entirely locally. The jobs shown above are reconfigured using the colocation scheduler:

Shuffled

Can it Still Handle a Fault?

If all the work for a job is being done on a single machine, a fair question to ask is - can it still handle a fault? The answer is yes - though a crashed machine will cause more previously completed work to be replayed. Onyx’s safety property remains intact because its streaming engine pushes message acknowledgment into the input storage medium layer.