onyx.peer.coordinator

->PeerCoordinator

(->PeerCoordinator workflow resume-point log messenger-group peer-config job-config peer-id job-id monitoring messenger group-ch allocation-ch shutdown-ch coordinator-thread)
Positional factory function for class onyx.peer.coordinator.PeerCoordinator.

barrier-period

(barrier-period job-config peer-config)

check-peer-timeout!

(check-peer-timeout! {:keys [messenger subscriber-liveness-timeout], :as state})

checkpoint-complete?

(checkpoint-complete? {:keys [initiated? epoch], :as checkpoint} status)

complete-job

(complete-job {:keys [tenancy-id log job-id messenger checkpoint group-ch curr-replica], :as state})

completed-checkpoint

(completed-checkpoint {:keys [checkpoint messenger job-id tenancy-id log curr-replica], :as state})

Coordinator

protocol

members

next-state

(next-state this old-replica new-replica)

start

(start this)

started?

(started? this)

stop

(stop this scheduler-event)

coordinator-backoff-ms

coordinator-iteration

(coordinator-iteration {:keys [messenger checkpoint last-heartbeat-time allocation-ch shutdown-ch barrier job job-id curr-replica subscriber-liveness-timeout], :as state} coordinator-max-sleep-ns barrier-period-ns heartbeat-ns)

emit-replica

(emit-replica {:keys [allocation-ch], :as coordinator} replica)

emit-seal-barrier

(emit-seal-barrier state)

evict-peer!

(evict-peer! {:keys [group-ch evicted curr-replica], :as state} peer-id)

initialise-state

(initialise-state {:keys [log messenger job-id tenancy-id curr-replica], :as state})

input-publications

(input-publications {:keys [peer-sites message-short-ids], :as replica} peer-config peer-id job-id)

make-task-data

(make-task-data replica job-id)

map->PeerCoordinator

(map->PeerCoordinator m__6522__auto__)
Factory function for class onyx.peer.coordinator.PeerCoordinator, taking a map of keywords to field values.

merged-statuses

(merged-statuses messenger)

new-peer-coordinator

(new-peer-coordinator workflow resume-point log messenger-group monitoring peer-config job-config peer-id job-id group-ch)

next-replica

(next-replica {:keys [log job-id peer-id messenger curr-replica tenancy-id peer-config], :as state} barrier-period-ns new-replica)

offer-barriers

(offer-barriers {:keys [messenger barrier], :as state})

offer-heartbeats

(offer-heartbeats {:keys [messenger], :as state})

periodic-barrier

(periodic-barrier {:keys [tenancy-id workflow-depth log curr-replica job-id messenger barrier checkpoint], :as state})

schedule-next-barrier

(schedule-next-barrier state barrier-period-ns)

shutdown

(shutdown {:keys [messenger peer-id], :as state})

start-coordinator!

(start-coordinator! {:keys [allocation-ch shutdown-ch peer-config job-config], :as state})

start-messenger

(start-messenger messenger replica job-id)

stop-coordinator!

(stop-coordinator! {:keys [shutdown-ch allocation-ch peer-id]} scheduler-event)

write-coordinate

(write-coordinate curr-version log tenancy-id job-id coordinate)