什么是 Extender

现在 Kubernetes 及其衍生生态在工业界和学术界都较为热门。为了将 Kubernetes 运用在更复杂更专业化的领域,Volcano 等扩展调度引擎诞生了。Volcano 提供了一些使用原版 Kubernetes 难以实现的调度方法,为大数据等运用场景提供了较好的调度服务。

然而总还有场景需要用到一些特殊的调度方法,即使是 Volcano 也不可能做到满足所有人的需要。为了给用户提供自定义调度算法的能力,Kubernetes 包含了一套拓展调度器 Extender。关于 Extender 的资料和示例实现互联网上已有许多。所谓 Extender,本质上就是一个 HTTP 服务器,用 API 的形式实现了调度过程中可能会用到的操作。这样一来,我们便可以用任何编程语言来实现我们自己的调度算法,只需要最终封装成 API 接口即可。

Volcano 为了实现针对大数据场合的调度算法,设计并使用了多种 CRD。同样地,Volcano 也提供了 Extender 接口来提供自定义算法的可能性。由于使用了更复杂的调度机制,Volcano 的 Extender 比 Kubernetes 的也要更复杂一些。

撰写此文时,除了官方源码和一个示例以外,互联网上并没有太多关于 Volcano 版 Extender 的资料可供参考。那就自己动手搞一个吧!

Extender 实现

服务器框架

既然 Extender 的重要好处之一就是允许用其他语言开发,考虑到后期可能有搭载深度学习算法的可能性,这次干脆就使用 Python 开发好了。服务器框架选用 Flask。

1
2
3
4
from flask import Flask, abort, request

extender = Flask(__name__)
ext = Ext()

如此建立了一个非常简单的服务器。考虑到 Extender 中会使用到的操作多种多样但实现某个操作与否很自由,干脆 Path 直接动态处理。这里用到了 Flask 的动态 Path 特性:

1
2
3
4
5
6
@extender.route('/<path:verb>', methods=['POST'])
def proc(verb):
if verb not in ext.verbs.keys():
abort(404)

return ext.verbs[verb](request)

所谓“Verb”实际上就是操作,在实现中对应了一个 Path。举例说明,有一个操作需要判断某个 Job 是否可以入队,其 Path 便可以是 /jobEnqueueableVerb。我们将在 ext.verbs 中注册一些已经被实现了的 Verb。如此一来,每次服务器收到请求时都要判断请求的 Path 是不是对应着一个已经被实现的 Verb。如果是,则调用对应的实现函数。如果不是,返回404 Not Found

具体操作实现

在 Volcano 的 Extender 中,每个请求要么没有 Body,要么是一份 JSON 数据。响应亦然。每个 Verb 对应的请求与响应的定义可以参考这里。最简单的 Verb 是 onSessionCloseVerb,用来通知 Extender 会话已经关闭。其请求与响应的 Body 均为空,因此实现非常简单:

1
2
def onSessionOpen(self, request):
return Response('', mimetype='application/json', status=200)

onSessionOpenVerbonSessionCloseVerb 对应,请求的 Body 包含了整个集群的状态信息(官方的示例中称其为“Snapshot”,十分贴切)。考虑到调度过程中可能还需要用到这份信息好吧我到最后也没用到,因此要保存下来备用:

1
2
3
4
5
6
def onSessionOpen(self, request):
try:
self.session = request.get_json()
return Response('', mimetype='application/json', status=200)
except:
return Response('', mimetype='application/json', status=400)

很容易理解,收到请求后先试图解析请求并保存下来。如果解析失败就报400 Bad Request

下面再搞一个请求响应均不为空的 Verb。逻辑参(chao)考(xi)了官方示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def predicate(self, request):
try:
req = request.get_json()
except:
return Response('', mimetype='application/json', status=400)

if 'task' not in req or 'node' not in req:
return Response('', mimetype='application/json', status=400)

resp = {'ErrorMessage': ''}
if req['task']['BestEffort'] and len(req['node']['Tasks']) > 10:
resp['ErrorMessage'] = 'Already too tasks on this node!'

return Response(json.dumps(resp), mimetype='application/json', status=200)

Predicate 操作试探是否能在某个特定节点上部署某个特定任务。如果能部署则一切安好,如果不能则需要返回一个错误信息。若错误信息不为空,则 Volcano 会认为此次测试失败。这个函数中我们首先解析请求并查看请求是否完整。若完整,则查看任务特性和目标节点任务数量以决定是否通过测试。最后,只要请求完整有效,无论我们是否同意这次调度,都需要返回200 OK,只是要根据情况填写 ErrorMessage

配置 Volcano

以上就实现了一个非常简易的 Extender。下面来试试这个 Extender 是不是真的能用。

Volcano 的配置很简单。执行 kubectl edit configmap volcano-scheduler-configmap -n volcano-system 就可以编辑 Volcano Scheduler 的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
...
- plugins:
- name: overcommit
- name: drf
#- name: predicates
- name: extender
arguments:
extender.urlPrefix: http://127.0.0.1:2333
extender.httpTimeout: 100ms
extender.onSessionOpenVerb: onSessionOpen
extender.onSessionCloseVerb: onSessionClose
extender.predicateVerb: predicate
...

为了测试,我们注释掉原先使用的 Predicate 插件,同时引入我们土制的 Predicate。默认情况下,保存配置后新配置会立即生效,Volcano Scheduler 将会以每秒若干次(最少一次打开会话、一次关闭会话)的频率访问我们运行在 127.0.0.1:2333 的 Extender。这个时候启动 Extender 就能看到发来的请求。

下面来随意调度一个任务试试。Volcano 的使用例子网上已有不少,就不再赘述。可以用 kubectl logs volcano-scheduler-[根据实际 Pod 名调整] -n volcano-system --tail 1000 来观看 Extender 是否报错,无报错即说明运行正常。

去看一眼 Extender 的输出,会发现 Volcano 真的调用了 Predict 操作!那就让调度失败一回吧,随便往默认的 ErrorMessage 里填入什么,重启任务,会发现任务永远卡在了 Pending 状态。kubectl describe 会看到日志底部即有我们写的错误原因。

パンパカパーン~~~我们成功实现了一个 Volcano Extender!

总结与展望

这次我们实现了一个很简单的 Volcano Extender。麻雀虽小五脏俱全,后面只需要根据实际需要构造其他的 Verb 就可以将其完善乃至投入现实使用。Demo 代码见 https://github.com/dynos01/volcano-extender

在制作这个 Extender 的过程中,我也意识到现在的 Volcano Extender 本身存在一些可以改进的地方。例如,默认设置下每秒都要传输一次集群信息,每次传输的信息量不可小觑(在我的实验中,包含 Headers 的整个请求达到了109KB。而对于 Volcano 擅长的数据科学领域而言,实际上可以认为绝大多数时候每秒的请求都是一样的。这就造成了系统资源的浪费。可能的改进方法包括每次只差分传输或者扩大调度时间粒度。

博客重制后的第一水 Get