Nona: A Stochastic Congestion-Aware Job Scheduler for Real-Time Inference Queries

* MIT EURECOM Technion
CloudNet'24

Abstract

This paper proposes a novel queueing-theoretic approach to enable stochastic congestion-aware scheduling for distributed machine learning inference queries. Our proposed framework, called Nona, combines a stochastic scheduler with an offline optimization formulation rooted in queueing-theoretic principles to minimize the average completion time of heterogeneous inference queries. At its core, Nona incorporates the fundamental tradeoffs between compute and network resources to make efficient scheduling decisions. Nona’s formulation uses the Pollaczek–Khinchine formula to estimate queueing latency and to predict system congestion. Builind upon conventional Jackson networks, it captures the dependency between the computation and communication operations of interfering jobs. From this formulation, we derive an optimization problem and use its results as inputs for the scheduler. We introduce a novel graph contraction procedure to enable cloud providers to solve Nona’s optimization formulation in practical settings. We evaluate Nona with real-world machine learning models (AlexNet, ResNet, DenseNet, VGG, and GPT2) and demonstrate that Nona outperforms state-of-the-art schedulers by up to 350×.

Crafting the Optimization Formulation

We present in the paper a high-level description of the optimization formulation used in Nona. This page is meant to provide more insight into the design choices made in the optimization. The problem consists in determining an expression for the average completion time of flows of jobs in a cluster, taking into account networking and compute resources. For simplicity purposes, we only consider queueing at a single bottleneck link in the network, but this optimization formulation could be extended to support topologies with multiple bottlenecks.


The variables of the problem are the assignment probabilities of tasks to servers. Given that two tasks placed on different servers incur a data transfer over the network, placement of tasks should be conditioned on the placement of previous tasks. The size of the problem is therefore proportional to the number of tasks in the graph, and to the number of other tasks on which it is conditioned. The first technique to simplify the problem consists in noticing that the Directed Acyclic Graphs (DAGs) of inference queries can be subdivided into smaller sub-graphs that can be scheduled almost independently. For example, the graph below can be split in two sub-graphs that we can reason about independently. We denote by root and sinks the first and last task of a sub-graph (tasks \(1\) and \(4\) for roots, and tasks \(4\) and \(10\) for sinks).

Initial DAG
First subgraph
Second subgraph

In this case, the dependency in placing task \(5\) is entirely contained in the placement of task \(4\), and is conditionally independent of the placement of tasks \(1\), \(2\), or \(3\). However, dependency in placement between tasks in the sub-graph, say tasks \(6\) and \(8\) needs to be considered.


We initially attempted to implement this dependency by conditioning the placement of every task in the sub-graph on the placement of some of the other tasks, but this yielded a prohibitively large problem space. To solve this, we consider a single variable for each sub-graph, that incorporates all the dependencies between tasks in the sub-graph. More specifically, the placement of a sub-graph corresponds to a partition of the tasks, where each part in the partition is placed on a different server. For convenience, we exclude the root and the sink of the sub-graph from the set of tasks to partition. Naturally, this leads us to consider placements not as absolute task to server allocation, but rather as a relative decision based on the placement of other tasks in the sub-graph. The direct implication is that the size of the optimization is independent of the number of servers. The only questions are then of the form "is this task placed on the same server as this other task?" rather than "which server should this task go to", which does not depend on the specific number of servers in the topology. As an example, one sub-graph partition of the first four tasks could be to run tasks \(1\) and \(2\) on one server, and tasks \(3\) and \(4\) on another server:

Contracted DAG

As mentioned above, the complexity of the optimization problem depends on the number of tasks in the DAG. The third technique therefore relies on contracting the tasks in the graphs, removing edges that should never be distributed. The contraction procedure is based on the idea that we want to contract as many edges as possible, while preserving data dependencies between tasks. In most cases, the edges that are not contractible are the ones that result in delaying the start of another task. The contraction of the second sub-graph above gives a new sub-graph similar to the first one:

Contracted DAG

A more complete example of graph contraction, as well as the exact conditions to determine whether an edge should be contracted are given in the paper.


The objective function of Nona's optimization formulation computes the expected job completion time as a function of the available cluster resources, and of the considered task partition probabilities. To obtain the final result, we propagate computation and communication costs across the DAGs. In the example above, this corresponds to the following:

  1. Compute task \(1\)
  2. Transfer data required for tasks \(2\) and \(3\). The amount of data transferred is given by the amount of data for a single query weighted by the probability of placing either task on a different server than \(1\).
  3. Compute tasks \(2\) and \(3\). The compute time is summed if \(2\) and \(3\) are in different parts, and run in parallel otherwise.
  4. Transfer data back to \(4\)
  5. etc...

Computing the weighted averages is not straightforward. For any chosen partition, we look at the completion time of every part. Within a part, all tasks run on the same server and hence must run sequentially. For complex graphs, cases where some tasks may have dependencies on other tasks outside their part and different from the root of the sub-graph might arise. For simplicity, we assume that the contracted graphs do not present this problem. In practice, we have not encountered this case for any of the inference DAGs we tested. Thus, the completion time of a part relative to the root is given by the sum of (\(i\)) the compute time of tasks, (\(ii\)) the communication time for the input of the part, and (\(iii\)) the communication time for the output of the part (Equation 10 below). The completion time of a sub-graph is then given by the completion time of the longest part, plus the compute time of the sink node (Equation 11 below). Finally, these completion times should be averaged over the possible partitions, weighted by the probability of choosing this partition.

Formulation

The notations used in the paper are summarized here.
Input
\(\mathcal{J}\) Set of jobs (i.e. of model DAGs)
\(\mathcal{G}_j(\mathcal{T}^j,\mathcal{D}^j)\) Compute DAG of job class \( j \) with tasks \( \mathcal{T}^j \) and directed edges \( \mathcal{D}^j \).
\(\beta_j\) Mean arrival rate of DAG \( j \in \mathcal{J} \).
\(b^{t, t'}\) The amount of data output from task \( t^j \) required to compute task \( t'^{j} \) (bits).
\(\mu\) Capacity of the bottleneck link (bits/s).
\(p^t\) The compute size of task \( t \) (number of operations).
\(\nu\) The compute power of servers (operations/s).
Auxiliary variables
\(\tau^j\) Average completion time for job class \( j \) (s).
\(\pi\) Partition of a sub-graph.
\(\pi_k\) Part \( k \) of \( \pi \).
\((\pi, \pi_k, \pi_l)\) Triplet denoting the choice of partition \(\pi\) for a given sub-graph, with the root of the sub-graph placed in part \(\pi_k\) and the sink placed in part \(\pi_l\).
\(\Omega_t\) Sample space for triplets \((\pi, \pi_k, \pi_l\) for sub-graph with task \(t\) as root.
\(\mathscr{P}\), \(\mathscr{S}\) Functions associating a set of tasks to respectively the set of their predecessors and successors.
\(\mathcal{N}\) Function associating the root of a sub-graph to its closest child root.
\(S_{(t, t')}\) Service time for the data transfer between tasks \(t\) and \(t'\).
\(\rho\) Average load of the link.
\(\phi\) Average queueing delay on the link.
\(c_{(t, t')}\) Sum of average service and queueing delay.
\(\chi_{t}[(\pi, \pi_i)]\) Assuming partition \( \pi \) was chosen for the sub-graph with root \(t\), average relative completion time of all tasks in \( \pi_i \).
\(\kappa^t\) Average completion time of sub-graph with root \(t\). The average is taken over the maxima part completion times for a given partition, weighted by the probability of choosing that partition.
Optimization variables
\(\theta_t\) Probability distribution on \(\Omega_t\)

Using the notations given above, the optimization problem used in Nona can be found below.


$$\begin{align} \text{Minimize} &&& \sum_{j \in \mathcal{J}} \beta_j\tau^j\\ \text{Subject to} &&& \sum_{\Omega_t} \theta_t[(\pi, \pi_k, \pi_l)] = 1\\ &&& \theta_t[(\pi, \pi_k, \pi_l)] \geq 0\\ &&& \lambda_{(t, t')} = \sum_{\Omega_t} \beta_j \theta_t[(\pi, \pi_k, \pi_l)] \cdot \begin{cases} \mathbb{1}(t' \notin \pi_k) & \text{if }t \text{ is a root}\\ \mathbb{1}(t \notin \pi_l) & \text{if }t' \text{ is a root}\\ \mathbb{1}((t, t') \notin \pi) & \text{o.w.} \end{cases}\\ &&& S_{(t, t')} = \frac{b^{(t, t')}}{\mu}\\ &&& \rho = \sum\limits_{j\in\mathcal{J}}\sum\limits_{t\in\mathcal{D}^j}\sum\limits_{t'\in\mathscr{S}(t)}\lambda_{(t, t')}S_{(t, t')}\\ &&& \rho < 1\\ &&& \phi = \frac{\sum\limits_{j\in\mathcal{J}}\sum\limits_{t\in\mathcal{D}^j}\sum\limits_{t'\in\mathscr{S}(t)}\lambda_{(t, t')}S_{(t, t')}^2}{2\left(1 - \rho\right)}\\ &&& c_{(t, t')} = \phi + S_{(t, t')}\\ &&& \chi_{t}[(\pi, \pi_i)] = \begin{aligned}[t]&\sum_{t_2\in\pi_i}\sum_{t_1 \in \mathscr{P}(t_1)} \bigg[\frac{p^{t'}}{\nu} + \mathbb{1}(t_2 \notin \pi_i) c_{(t_1, t_2)} + \mathbb{1}(t_1 = t \land i \neq k)c_{(t_1, t_2)} \bigg]\\ &+ \mathbb{1}(i \neq l)\sum_{t_1 \in \pi_i | (t_1, \mathcal{N}(t)) \in \mathcal{D}^j} c_{(t_1, \mathcal{N}(t))}\end{aligned}\\ &&& \kappa^t = \sum_{(\pi, \pi_k)}\theta_t[(\pi, \pi_k, \pi_l)] \max_{\pi_i \in \pi}\left\{\chi_{t, k, l}[(\pi, \pi_i)]\right\} + \frac{p^{\mathcal{N}(t)}}{\nu}\\ \end{align}$$