溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

簡單任務(wù)調(diào)度系統(tǒng)

發(fā)布時間:2020-06-27 16:22:47 來源:網(wǎng)絡(luò) 閱讀:851 作者:長跑者1號 欄目:編程語言

一 概述

1 運(yùn)維管理的階段

1 人工階段

人工盯著服務(wù)器,出了問題,到機(jī)器前面,翻日志,查狀態(tài),手動操作

2 腳本階段

開始寫一些自動化腳本,啟動計(jì)劃任務(wù),自動啟動服務(wù),監(jiān)控服務(wù)等

3 工具階段

腳本功能太弱,開發(fā)了大量工具,某種工具解決某個特定領(lǐng)域的問題,常用的有ansible,puppet等

4 平臺階段

將工具整合,自主研發(fā),實(shí)現(xiàn)標(biāo)準(zhǔn)化,實(shí)現(xiàn)自動化流程控制,而今,平臺已經(jīng)開始邁向智能化的發(fā)展方向。

二 mschedule 設(shè)計(jì)

1 完整代碼鏈接

https://gitee.com/ChangPaoZhe/mschedule

2要求

1 分發(fā)任務(wù)
分發(fā)腳本到目前節(jié)點(diǎn)上去執(zhí)行


2 控制
控制并發(fā),控制多少個節(jié)點(diǎn)同時執(zhí)行
對錯誤做出響應(yīng),由用戶設(shè)定,最多允許失敗的比例或者數(shù)量,當(dāng)超過范圍時,需要終止任務(wù)執(zhí)行


3 能跨機(jī)房部署


4 能對作業(yè)做版本控制,這是輔助功能,可過后實(shí)現(xiàn)

3 項(xiàng)目基本概述

1 基本概述

本項(xiàng)目的出發(fā)點(diǎn),是只需要會使用shell腳本就可以了,可以通過使用shell腳本的方式來完成遠(yuǎn)程任務(wù)的下發(fā)和處理流程。

2 其他自動化工具二次開發(fā)缺點(diǎn)

ansible,salt等需要學(xué)習(xí)特定的內(nèi)部語言,如果覺得ansible這樣的工具不能滿足需求,二次開發(fā)難度過高,代碼量不小,本身它們開發(fā)接口不完善,而且熟悉它的叫也比較難,就算開發(fā)出來維護(hù)也難。

從這些項(xiàng)目上二次開發(fā),等于拉一個分支,如果主分支有了新的特性,想要合并也是比較困難的。

自己開發(fā),滿足自己需求,完全適合自己需求,代碼規(guī)??煽兀阌谒私邮站S護(hù)。

3 項(xiàng)目初始版本目標(biāo)

自己開發(fā)就是造輪子,造輪子不是不好,其起初要實(shí)現(xiàn)的功能應(yīng)該是比較簡單的。后面可以逐步進(jìn)行完善操作。

4 項(xiàng)目基本架構(gòu)圖

簡單任務(wù)調(diào)度系統(tǒng)

瀏覽器端和webSERVER端交互是通過HTTP實(shí)現(xiàn)的,而WEB server和master server 是通過TCP鏈接來實(shí)現(xiàn)的,master server 和agent之間也是通過TCP 鏈接來實(shí)現(xiàn)的

4 分發(fā)任務(wù)設(shè)計(jì)

1 分發(fā)任務(wù)分類

1 有agent 類

有agent類,被控節(jié)點(diǎn)需要安裝或運(yùn)行特殊的軟件,用于和服務(wù)器端進(jìn)行通信,服務(wù)器端把腳本,命令傳遞給agent端,由agent端控制來執(zhí)行

2 無agent類

被控節(jié)點(diǎn)不需要安裝或者運(yùn)行特殊軟件,如通過SSH來實(shí)現(xiàn),這其實(shí)也是有agent的,不過不是自己寫的程序


優(yōu)缺點(diǎn)

1 通用,簡單,易實(shí)現(xiàn),但管理不善,容易出現(xiàn)安全問題

2 并行效率不高,有agent的并行執(zhí)行可以不和管理服務(wù)器通信,可以并發(fā)很高,ssh執(zhí)行要和master之間通信

3 ssh鏈接是有狀態(tài)的,任務(wù)執(zhí)行的時候,master不能掛了,否則任務(wù)將執(zhí)行失敗。

5 執(zhí)行腳本(subprocess)

python 中有很多運(yùn)行進(jìn)程的方式,不過都過時了。
建議使用標(biāo)準(zhǔn)庫subprocess模塊,啟動一個子進(jìn)程。

1 初始化類源碼

    def __init__(self, args, bufsize=-1, executable=None,
                 stdin=None, stdout=None, stderr=None,
                 preexec_fn=None, close_fds=_PLATFORM_DEFAULT_CLOSE_FDS,
                 shell=False, cwd=None, env=None, universal_newlines=False,
                 startupinfo=None, creationflags=0,
                 restore_signals=True, start_new_session=False,
                 pass_fds=()):

第一個是參數(shù),后面是可選,但shell默認(rèn)為False,可將其置為True, stdout 后面跟文件或管道

        def wait(self, timeout=None, endtime=None):
            """Wait for child process to terminate.  Returns returncode
            attribute."""
            if endtime is not None:
                timeout = self._remaining_time(endtime)
            if timeout is None:
                timeout_millis = _winapi.INFINITE
            else:
                timeout_millis = int(timeout * 1000)
            if self.returncode is None:
                result = _winapi.WaitForSingleObject(self._handle,
                                                    timeout_millis)
                if result == _winapi.WAIT_TIMEOUT:
                    raise TimeoutExpired(self.args, timeout)
                self.returncode = _winapi.GetExitCodeProcess(self._handle)
            return self.returncode

此處返回是狀態(tài),0為成功,其他為失敗


stdout 方法調(diào)用的是一個文件,因此可使用文件的形式進(jìn)行處理

        if c2pread != -1:
            self.stdout = io.open(c2pread, 'rb', bufsize)
            if universal_newlines:
                self.stdout = io.TextIOWrapper(self.stdout)

2 基本代碼如下

#!/usr/bin/poython3.6
#conding:utf-8
import  subprocess
from subprocess  import  Popen,PIPE

out=Popen("echo  'hello'",shell=True,stdout=PIPE)
code=out.wait(10)
txt=out.stdout.read()

print ("code={}  txt={}".format(code,txt.decode()))

結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

6 項(xiàng)目基本構(gòu)建

1 創(chuàng)建文件并添加虛擬環(huán)境

mkdir  mschedule  -p
cd mschedule/
pyenv virtualenv  3.5.3  msch
pyenv local msch 

簡單任務(wù)調(diào)度系統(tǒng)

2 構(gòu)建模塊agent,并創(chuàng)建執(zhí)行程序executor.py

#!/usr/bin/poython3.6
#conding:utf-8
from   subprocess   import  PIPE,Popen

class  Executor:
    def  run(self,script,timeout):
        p=Popen(script,shell=True,stdout=PIPE)
        code=p.wait(timeout=timeout)
        txt=p.stdout.read()
        return  (code,txt)

if __name__ == "__main__":
    exec=Executor()
    print (exec.run("echo  'hello'",3))

結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

7 agent 和master設(shè)計(jì)

用戶和master server 通信,提交任務(wù),此處是通過HTTP的方式提交任務(wù)
master 按照用戶要求將任務(wù)分發(fā)到指定的節(jié)點(diǎn)上,這些節(jié)點(diǎn)上需要有agent用于和master通信,接受master發(fā)布的任務(wù),并執(zhí)行這些任務(wù)


設(shè)計(jì)agent,越簡單越好,越簡單bug越少,越穩(wěn)定。
從本質(zhì)上來說,master,agent設(shè)計(jì)是典型的CS編程模式
master作為CS中的server,agent作為CS中的client

8 消息設(shè)計(jì)

1 注冊信息

agent啟動后,需要主動連接server,并注冊自己
信息包括
hostname:報(bào)告自己的主機(jī)名稱,此主機(jī)名稱可能會重復(fù)

UUID,用于唯一標(biāo)識這臺主機(jī)

IP: 用于更加方便的管理主機(jī)

其它相關(guān)信息視情況而定

 {
  "type": "register",  # 此處用于定義消息類型
            "payload":{
                "id" :  uuid,  #用于唯一標(biāo)識一臺主機(jī)
                "hostname":  "xxxx",  # 對應(yīng)agent名稱
                "IP": [],  # agent IP地址,其可能包含多個IP地址,因此此處使用列表進(jìn)行存儲
            }
    }

2 心跳信息

agent定時向master發(fā)送心跳包,包含UUID這個唯一標(biāo)識,附帶hostname和ip地址,hostname和ip都可能變動,但agent不變,其UUID便不會發(fā)生變化,其他相關(guān)信息科一附加, 如更加flag,用于標(biāo)識agent是否有正在執(zhí)行的任務(wù)。

 {
   "type": "heartbeat",  # 此處用于定義消息類型
            "payload":{
                "id" :  uuid,  #用于唯一標(biāo)識一臺主機(jī)
                "hostname":  "xxxx",  # 對應(yīng)agent名稱
                "IP": [],  # agent IP地址,其可能包含多個IP地址,因此此處使用列表進(jìn)行存儲
            }
}

3 任務(wù)消息

master分派任務(wù)給agent,發(fā)送任務(wù)描述信息到agent。
注意腳本字符串使用base64編碼

 {  
     "type"  :"task",
     "payload" :{
             "id"  :"task-uuid",  # 定義任務(wù)的唯一標(biāo)識
             "script" : "base64code",  #定義執(zhí)行任務(wù)的內(nèi)容
             "timeout"  :0, # 定義超時時長
             "parallel"  :1,  # 定義并行執(zhí)行數(shù)
             "fail_rate"  :0,  # 定義失敗率,及百分比為多少代表失敗
             "fail_count"  :-1 # 定義失敗的次數(shù)為多少次表示失敗,-1表示不關(guān)心
     }

 }

4 任務(wù)結(jié)果消息

當(dāng)agent任務(wù)執(zhí)行完成后,返回給master該任務(wù)執(zhí)行的狀態(tài)碼和輸出結(jié)果。

{
    "type"  :"result",
    "payload" :{
        "id": "task-uuid", # 定義任務(wù)唯一標(biāo)識
        "agent_id":  "agent-uuid",  #定義任務(wù)執(zhí)行者
        "code" : 0,  #定義任務(wù)執(zhí)行結(jié)果返回值。0 表示成功,其他表示失敗 
        "output" :"base64encode"  # 定義任務(wù)執(zhí)行結(jié)果,及輸出到控制臺的結(jié)果

    }
}

以上的master,agent之間需要傳遞消息,消息采用json格式。

三 agent端代碼實(shí)現(xiàn)

1 日志實(shí)現(xiàn)

簡單任務(wù)調(diào)度系統(tǒng)

具體代碼如下

#!/usr/bin/poython3.6
#conding:utf-8
import  logging
def  getlogger(mod_name:str,filepath:str='/var/log/mschedule'):
    logger=logging.getLogger(mod_name)  # 獲取名字
    logger.setLevel(logging.INFO)  # 添加日志級別
    logger.propagate=False  # 配置不想上傳遞
    handler=logging.FileHandler("{}/{}.log".format(filepath,mod_name))
    fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s (%(filename)s:L%(lineno)d)",
                            datefmt='%Y-%m-%d %H:%M:%S')
    handler.setFormatter(fmt)
    logger.addHandler(handler)
    return  logger

if __name__ == "__main__":
    log = getlogger('test')
    log.info('13234545654')

結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

2 通信模塊實(shí)現(xiàn)(zerorpc )

1 介紹和安裝

原生的socket編程過于底層,很少使用,任何一門語言都要避開直接使用socket庫開發(fā),太過底層,難寫難維護(hù)。

zeroprc 是基于 ZeroMQ和MessagePack 來實(shí)現(xiàn)的通信工具。

官網(wǎng)地址

http://www.zerorpc.io

安裝

pip  install  zerorpc  

2 基本代碼實(shí)現(xiàn)

根目錄創(chuàng)建app.py和appserver.py

簡單任務(wù)調(diào)度系統(tǒng)

server 端配置

#!/usr/bin/poython3.6
#conding:utf-8

import zerorpc

class HelloRPC(object):  #定義方法
    def hello(self, name):
        return "Hello, %s" % name

s = zerorpc.Server(HelloRPC())  # 方法注入
s.bind("tcp://0.0.0.0:8080")  # 綁定方法 
s.run()  # 運(yùn)行方法

client端配置

#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc

c = zerorpc.Client()
c.connect("tcp://127.0.0.1:8080")
print (c.hello("RPC"))
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc
import   threading
c = zerorpc.Client()
c.connect("tcp://127.0.0.1:8080")
e=threading.Event()
while not  e.wait(3):
    print(c.hello('test client'))
    print ('```````````````')

結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

簡單任務(wù)調(diào)度系統(tǒng)

3 注冊消息實(shí)現(xiàn)

1 uuid唯一主機(jī)標(biāo)識

使用uuid.uuid4().hex 獲取一個uuid,一個節(jié)點(diǎn)起始運(yùn)行的時候是沒有uuid的,一旦運(yùn)行會生成一個uuid,并持久化到一個文件中,下次運(yùn)行先找這個文件,如果文件中有uuid,就直接讀取,沒有uuid就重新生成并寫入到該文件中。

#!/usr/bin/poython3.6
#conding:utf-8
#!/usr/bin/poython3.6
#conding:utf-8
import uuid
print  (uuid.uuid4().hex)
print  (uuid.uuid4().hex)
print  (uuid.uuid4().hex)

結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

2 hostname

windows 和Linux 獲取主機(jī)名稱的方式是不同的

可以在所有平臺上是使用socket.gethostname()獲取主機(jī)名。

#!/usr/bin/poython3.6
#conding:utf-8
import  socket
print (socket.gethostname())

簡單任務(wù)調(diào)度系統(tǒng)

3 ip 列表

pip  install   netifaces 

netifaces.interfaces() 返回接口列表

netifaces.ifaddresss(interface) 獲取指定接口的IP地址,返回相關(guān)信息

ip地址判斷

#!/usr/bin/poython3.6
#conding:utf-8
import  ipaddress
ips=['127.0.0.1','192.168.0.1','169.254.123.1','0.0.0.0','239.168.0.255','224.0.0.1','8.8.8.8']

for  ip  in  ips:
    print (ip)
    ip=ipaddress.ip_address(ip)
    print ('Linklocal  {}'.format(ip.is_link_local))  # 169.254地址
    print ('回環(huán) {}'.format(ip.is_loopback))  # 回環(huán)
    print ('多播 {}'.format(ip.is_multicast))   # 多播
    print ('公網(wǎng) {}'.format(ip.is_global))  # 公網(wǎng),全球范圍地址
    print ('私有 {}'.format(ip.is_private))  # 私有地址
    print ('保留 {}'.format(ip.is_reserved))  # 保留地址
    print ('版本 {}'.format(ip.version))  #ipv4地址
    print ('----------------------------')

結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)
簡單任務(wù)調(diào)度系統(tǒng)

#!/usr/bin/poython3.6
#conding:utf-8
import  netifaces
print (netifaces.interfaces())  # 獲取所有的網(wǎng)卡接口
for i in  netifaces.interfaces():
    print ('i....',netifaces.ifaddresses(i))  # 使用ifaddress獲取端口對應(yīng)的IP地址
    print ()
    print ('------------------------------')
    print ()
    print ('[2]',netifaces.ifaddresses(i)[2])  # 獲取字典key為2的對應(yīng)的值

結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

其是一個字典,key為2就是ipv4地址
每一個接口返回的ipv4地址是一個列表,也就是說可以有多個,ipv4地址描述是在addr上

#!/usr/bin/poython3.6
#conding:utf-8
import  netifaces
print (netifaces.interfaces())  # 獲取所有的網(wǎng)卡接口
for i in  netifaces.interfaces():
        for   p  in  netifaces.ifaddresses(i)[2]:
            if  p['addr']:
                print ('ip',p['addr'])   # 獲取ip地址 

結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

#!/usr/bin/poython3.6
#conding:utf-8
import  netifaces
import  ipaddress
print (netifaces.interfaces())  # 獲取所有的網(wǎng)卡接口
for i in  netifaces.interfaces():
        for   p  in  netifaces.ifaddresses(i)[2]:
            if  p['addr']:
                ip=ipaddress.ip_address(p['addr'])   #獲取ip地址
                if  ip.is_loopback  or ip.is_multicast  or ip.is_link_local  or ip.is_reserved:  # 判斷IP地址
                    continue
                print (ip)  

結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

4 注冊信息和相關(guān)信息處理

在agent文件包中創(chuàng)建msg.py文件,用于存儲相關(guān)主從信息和配置信息

簡單任務(wù)調(diào)度系統(tǒng)

#!/usr/bin/poython3.6
#conding:utf-8
import  socket
import  uuid
import   netifaces
import  ipaddress
import  os
class Messgae:
    def  __init__(self,myidpath):
        if os.path.exists(myidpath):  # 如果存在
            with  open(myidpath)  as  f:
                self.id=f.readline().strip()
        else:
            self.id=uuid.uuid4().hex
            with open(myidpath,'w')  as f:
                f.write(self.id)

    def get_ipaddress(self):
        address=[]
        for p in  netifaces.interfaces():  # 獲取網(wǎng)口列表
            n=netifaces.ifaddresses(p)  # 獲取字典
            if  n.get(2):  # 查看是否存在ipv4地址
                for  ip  in  n[2]:  # 此處獲取對應(yīng)列表的值
                    if  ip['addr']: # 查看ip地址是否存在
                        ip=ipaddress.ip_address(ip['addr'])
                        if    ip.is_reserved  or ip.is_multicast  or ip.is_link_local or ip.is_loopback:
                            continue
                        address.append(str(ip))
        return  address

    def  hearbeat(self):
        return   {
            "type" :"hearbeat",
            "payload" :{
                "ip"  : self.get_ipaddress(),
                "hostname" : socket.gethostname(),
                "id" : self.id
            }
        }
    def  reg(self):
        return   {
            "type" :"register",
            "payload" :{
                "ip"  : self.get_ipaddress(),
                "hostname" : socket.gethostname(),
                "id" : self.id
            }
        }

if __name__ == "__main__":
    msg=Messgae('/var/log/mschedule/uuid')
    print (msg.reg())

測試結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

5 處理鏈接相關(guān)配置

agent中創(chuàng)建config模塊用于添加相關(guān)鏈接服務(wù)端IP地址
agent中創(chuàng)建cm 模塊用于處理鏈接相關(guān)配置

簡單任務(wù)調(diào)度系統(tǒng)
簡單任務(wù)調(diào)度系統(tǒng)

config.py 配置如下

#!/usr/bin/poython3.6
#conding:utf-8

CONN_URL="tcp://127.0.0.1:9000"

cm.py 模塊配置如下

#!/usr/bin/poython3.6
#conding:utf-8
import   zerorpc   #添加模塊
import  threading  # 用于處理中斷相關(guān)
from  .msg import  Messgae  # 獲取消息

from  .config import  CONN_URL

from  utils  import getlogger

class  Conn_Manager:
    def __init__(self,timeout=3):
        self.timeout=timeout
        self.client=zerorpc.Client()
        self.event=threading.Event()
        self.message=Messgae('/var/log/mschedule/uuid')  # 此處用于初始化消息
        self.log=getlogger('agent')  # 此處填寫相關(guān)的log日志名稱
    def start(self):
        self.client.connect(CONN_URL)  # 鏈接處理
        self.log.info('注冊消息發(fā)送 {}'.format(self.client.send(self.message.reg())))  # 發(fā)送心跳信息
        self.client.send(self.message.reg())  #處理注冊消息
        while  not self.event.wait(self.timeout):  # 等待的時間
            self.log.info('心跳消息發(fā)送 {}'.format(self.client.send(self.message.hearbeat())))  # 發(fā)送心跳信息
    def shutdown(self):
        self.log.info("關(guān)閉操作")
        self.client.close()
        self.event.set()

agent 中 _init_.py 端配置

#!/usr/bin/poython3.6
#conding:utf-8
from  .cm  import Conn_Manager
class  app:
    def __init__(self,timeout):
        self.conn=Conn_Manager(timeout)
    def start(self):
        self.conn.start()
    def shutdown(self):
        self.conn.shutdown()

全局根目錄下 app.py 端配置如下

#!/usr/bin/poython3.6
#conding:utf-8
from  agent  import app

if __name__ == "__main__":
    agent=app(3)
    try:
        agent.start()
    except  KeyboardInterrupt:
        agent.shutdown()

服務(wù)端測試文件appserver 配置如下

#!/usr/bin/poython3.6
#conding:utf-8

import zerorpc

class HelloRPC(object):  #定義方法
    def send(self, name):
        return "Hello, %s" % name

s = zerorpc.Server(HelloRPC())  # 方法注入
s.bind("tcp://0.0.0.0:9000")  # 綁定方法
s.run()  # 運(yùn)行方法

啟動結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

日志結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

簡單任務(wù)調(diào)度系統(tǒng)

處理客戶端重連機(jī)制

默認(rèn)的,服務(wù)端關(guān)閉后,客戶端結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

處理結(jié)果如下

cm.py如下

#!/usr/bin/poython3.6
#conding:utf-8
import   zerorpc   #添加模塊
import  threading  # 用于處理中斷相關(guān)
from  .msg import  Messgae  # 獲取消息

from  .config import  CONN_URL

from  utils  import getlogger

class  Conn_Manager:
    def __init__(self,timeout=3):
        self.timeout=timeout
        self.client=zerorpc.Client()
        self.event=threading.Event()
        self.message=Messgae('/var/log/mschedule/uuid')  # 此處用于初始化消息
        self.log=getlogger('agent')  # 此處填寫相關(guān)的log日志名稱
    def start(self):
        try:
            self.client.connect(CONN_URL)  # 鏈接處理
            self.log.info('注冊消息發(fā)送 {}'.format(self.client.send(self.message.reg())))  # 發(fā)送心跳信息
            self.client.send(self.message.reg())  #處理注冊消息
            while  not self.event.wait(self.timeout):  # 等待的時間
                self.log.info('心跳消息發(fā)送 {}'.format(self.client.send(self.message.hreadbeat())))  # 發(fā)送心跳信息
        except  Exception as e:
            print ('--------------------')
            self.event.set()
            raise  e  # 此處是拋出異常到上一級
    def shutdown(self):
        self.log.info("關(guān)閉操作")
        self.client.close()
        self.event.set()

agent._init_.py 結(jié)果如下

#!/usr/bin/poython3.6
#conding:utf-8
from  .cm  import Conn_Manager
import  threading
class  app:
    def __init__(self,timeout):
        self.conn=Conn_Manager(timeout)
        self.event=threading.Event()
    def start(self):
        while not self.event.is_set():
            try:
                self.conn.start()
            except  Exception  as e:
                    print('重連')
                    self.conn.shutdown()
            self.event.wait(3)

    def shutdown(self):
        self.event.set()
        self.conn.shutdown()

app.py 如下

#!/usr/bin/poython3.6
#conding:utf-8
from  agent  import app

if __name__ == "__main__":
    agent=app(3)
    try:
        agent.start()
    except  KeyboardInterrupt:
        agent.shutdown()

結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

四 master端實(shí)現(xiàn)

1 基本功能

1 TCP Server

綁定端口,啟動監(jiān)聽,等待agent鏈接。

2 信息存儲

存儲agent列表
存儲用戶提交的Task列表,用戶通過WEB提交的任務(wù)信息存儲下來。

3 接受注冊

將注冊信息寫入agent列表
接受心跳信息
接受agent端發(fā)送的心跳信息

4 派發(fā)任務(wù)

將用戶提交的任務(wù)分配到agent端

2 基本代碼實(shí)現(xiàn)

1 master.config 模塊

用于指定服務(wù)端綁定IP地址和端口號

簡單任務(wù)調(diào)度系統(tǒng)

#!/usr/bin/poython3.6
#conding:utf-8

MASTER_URL="tcp://0.0.0.0:9000"

if __name__ == "__main__":
    pass

2 master.handler 模塊

主要負(fù)責(zé)客戶端數(shù)據(jù)的調(diào)度

簡單任務(wù)調(diào)度系統(tǒng)

#!/usr/bin/poython3.6
#conding:utf-8
from   utils  import  getlogger

log=getlogger('handler')

class Handler(object):
    def send(self,msg):  # 定義一個可調(diào)用的基礎(chǔ)函數(shù)
        log.info(" ack  ok  {}".format(msg))
        return   " ack  ok  {}".format(msg)

3 cm.py 模塊

用于tcp 鏈接建立和關(guān)閉

簡單任務(wù)調(diào)度系統(tǒng)

#!/usr/bin/poython3.6
#conding:utf-8
from utils  import  getlogger
from  .config import   MASTER_URL
import  zerorpc
from  .handler import   Handler

log=getlogger('server')

class Master_Listen:
    def __init__(self):
        self.server=zerorpc.Server(Handler())
    def start(self):
        self.server.bind(MASTER_URL)
        log.info('Master 啟動配置')
        self.server.run()
    def shutdown(self):
        self.server.close()

4 master._init_.py 模塊

簡單任務(wù)調(diào)度系統(tǒng)

#!/usr/bin/poython3.6
#conding:utf-8
from  .cm import   Master_Listen

class appserver:
    def __init__(self):
        self.appserver=Master_Listen()
    def start(self):
        self.appserver.start()
    def shutdown(self):
        self.appserver.shutdown()

5 appserver.py模塊

#!/usr/bin/poython3.6
#conding:utf-8

from master  import  appserver

if __name__ == "__main__":
    appserver=appserver()
    try:
        appserver.start()
    except KeyboardInterrupt:
        appserver.shutdown()

啟動服務(wù)測試如下

簡單任務(wù)調(diào)度系統(tǒng)

結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

上述代碼實(shí)現(xiàn)了基本的注冊,心跳部分的功能

經(jīng)觀察可知,目前注冊和心跳除了類型不同外,其可以認(rèn)為第一次心跳成功就是注冊。

3 master的數(shù)據(jù)設(shè)計(jì)

master端核心需要存儲2中數(shù)據(jù):agent端數(shù)據(jù),用戶客戶端瀏覽器提交的任務(wù)Task,構(gòu)造出一個數(shù)據(jù)結(jié)構(gòu),存儲相關(guān)信息.具體數(shù)據(jù)結(jié)構(gòu)如下

1 agent客戶端數(shù)據(jù)存儲結(jié)構(gòu)

{
    "agents" :{
        "agent_id"  :{
            "heartbeat" :"timestamp",
            "busy" :False,
            "info" :{
                "hostname" :"",
                "ip" :[]
            }
        }
    }
}

數(shù)據(jù)結(jié)構(gòu)解釋如下

1 agents里面記錄了所有注冊的agent
agent_id,字典的key,每一個agent 都有一個不同uuid,所以這個字典的鍵就是uuid,
heartbeat 由于設(shè)計(jì)中并沒有讓agent端發(fā)送心跳時間,所以就在master端記錄了收到的時間
busy 如果agent 上有任務(wù)在執(zhí)行。則此值表現(xiàn)為True
info 記錄agent上發(fā)過來的hostname和ip列表

2 task數(shù)據(jù)存儲結(jié)構(gòu)

{
    "tasks" :{
        "task_id" :{
            "script" :"base64encode",
            "targets" :{
                "agent_id" :{
                    "state":"WAITING",
                    "output" :""
                }
            },
                        "state"  :"WAITING"
        }
    }
}

task 記錄所有任務(wù)及target(agent)的狀態(tài)

task_id ,字典的key對應(yīng)一個一個task,item 也是taskid:{} 結(jié)構(gòu)
task 任務(wù),task.json 的payload信息
targets目標(biāo),用于指定agent的節(jié)點(diǎn),記錄agent上的state和輸出output
state狀態(tài),單個agent上的執(zhí)行狀態(tài)

state 這是一個task的狀態(tài),整個任務(wù)的狀態(tài),比如統(tǒng)計(jì)達(dá)到了agent失敗上限了,這個task的state 就置為失敗

狀態(tài)常量
"WAITING" "RUNNING" "SUCCEED" "FAILED"

4 agent 端信息存儲

創(chuàng)建 storage.py 模塊
構(gòu)建Storage 類,用于存儲用戶信息

簡單任務(wù)調(diào)度系統(tǒng)

#!/usr/bin/poython3.6
#conding:utf-8

import  datetime

class Storage:
    def  __init__(self):
        self.agents={}  # 此處用于存儲用戶信息
        self.tasks={}  # 此處用于存儲作業(yè)信息 
    def reg_hb(self,agent_id,info):  # id 及就是客戶端的id ,info 及就是host和ip地址
        self.agents[agent_id] = {
            'heaerbeat' : datetime.datetime.now(),
            'info' :info,
            'busy':self.agents.get(agent_id,{}).get('busy',False)
        }
        # busy 讀不到置False,讀到了不變

handler.py端配置如下

#!/usr/bin/poython3.6
#conding:utf-8
from   utils  import  getlogger
from  .storage import  Storage

log=getlogger('handler')

class Handler(object):
    def __init__(self):
        self.store=Storage()
    def send(self,msg):  # 定義一個可調(diào)用的基礎(chǔ)函數(shù),此處的msg及就是對應(yīng)的函數(shù)
        log.info('客戶端agent發(fā)送消息為:{}'.format(msg))
        try:
            if  msg['type']  in  {'hearbeat','register'}:
                payload=msg['payload']
                info={'hostname' :payload['hostname'],'ip' :payload['ip']}
                self.store.reg_hb(payload['id'],info)
                log.info("客戶端數(shù)據(jù)列表為:{}".format(self.store.agents))  # 客戶端的列表
                return   "agent信息為: {}".format(msg)
        except Exception  as e:
            log.error("注冊客戶端信息錯誤為:{}".format(e))
            return  "Bad  Request...."

運(yùn)行結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

5 task 任務(wù)基本注冊和創(chuàng)建

1 概述

用戶通過WEB(HTTP)提交新的任務(wù),任務(wù)json信息有:
1 任務(wù)腳本script,base64編碼
2 超時時間timeout
3 并行度 parallel
4 失敗率 fail_rate
5 失敗次數(shù)fail_count
6 targets 是跑任務(wù)的Agent的agent_id列表,這個目前也是在用戶端選好的,如yoghurt需要在主機(jī)名為webserver-xxxx的幾臺設(shè)備上運(yùn)行腳本,為了用戶方便,可以使用類似ansible的分組。

在Master端受到信息后,需要添加2個信息

task_id 是Mater 端新建任務(wù)時生成的uuid
state 默認(rèn)狀態(tài)是WAITING

在WEB server 中最后將用戶端發(fā)送來的數(shù)據(jù)組成下面的字典

task={
    "task_id" :t.id,
    "script" :t.script,
    "timeout":t.timeout,
    "parallel" :t.parallelm,
    "fail_rate":t.fail_rate,
    "fail_count":t.fail_count,
    "state":t.state,
    "targets":t.targets
}

2 構(gòu)建state類

用于處理相關(guān)消息的類型

簡單任務(wù)調(diào)度系統(tǒng)

#!/usr/bin/poython3.6
#conding:utf-8

WAITING='WAITING'
RUNNING='RUNNING'
SUCCEED='SUCCEED'
FAILED='FAILED'

3 構(gòu)建task類

創(chuàng)建master/task.py 類處理webserver端數(shù)據(jù)

簡單任務(wù)調(diào)度系統(tǒng)\

#!/usr/bin/poython3.6
#conding:utf-8
import  uuid   # 獲取唯一的task_id

from  .state import *

class Task:
    def  __init__(self,task_id,script,targets,timeout=0,parallel=1,fail_rate=0,fail_count=-1):
        self.id=task_id  # task唯一標(biāo)識,用于確定任務(wù)
        self.script=script  # 對應(yīng)的腳本內(nèi)容,客戶端輸入的腳本
        self.timeout=timeout # 超時時間
        self.parallel=parallel # 并行執(zhí)行數(shù)量
        self.fail_rate=fail_rate  #失敗率
        self.fail_count=fail_count #失敗數(shù)
        self.state=WAITING  # 對應(yīng)的消息的狀態(tài)
        self.targets={agent_id:{'state' : WAITING,'output':''} for agent_id  in targets}  # 此處對應(yīng)客戶端列表
        self.target_count=len(self.targets)  # 此處對應(yīng)客戶端的數(shù)量

在master.storage.py模塊中進(jìn)行相關(guān)方法調(diào)用,并將其存儲進(jìn)入task中

#!/usr/bin/poython3.6
#conding:utf-8

import  datetime
from  .task import  Task

class Storage:
    def  __init__(self):
        self.agents={}  # 此處用于存儲用戶信息
        self.tasks={}  # 此處用于存儲作業(yè)信息
    def reg_hb(self,agent_id,info):  # id 及就是客戶端的id ,info 及就是host和ip地址
        self.agents[agent_id] = {
            'heaerbeat' : datetime.datetime.now(),
            'info' :info,
            'busy':self.agents.get(agent_id,{}).get('busy',False)
        }
        # busy 讀不到置False,讀到了不變

    def add_task(self,task:dict):  # 此處用于從客戶端獲取相關(guān)的數(shù)據(jù)
        t=Task(**task)  # 此處進(jìn)行參數(shù)解構(gòu)
        self.tasks[t.id]=t
        return  t.id  # 此處用于獲取處理id

在master/handler.py 中處理用于webservr調(diào)用相關(guān)配置

#!/usr/bin/poython3.6
#conding:utf-8
from   utils  import  getlogger
from  .storage import  Storage
import  uuid

log=getlogger('handler')

class Handler(object):
    def __init__(self):
        self.store=Storage()
    def send(self,msg):  # 定義一個可調(diào)用的基礎(chǔ)函數(shù),此處的msg及就是對應(yīng)的函數(shù)
        log.info('客戶端agent發(fā)送消息為:{}'.format(msg))
        try:
            if  msg['type']  in  {'hearbeat','register'}:
                payload=msg['payload']
                info={'hostname' :payload['hostname'],'ip' :payload['ip']}
                self.store.reg_hb(payload['id'],info)
                log.info("客戶端數(shù)據(jù)列表為:{}".format(self.store.agents))  # 客戶端的列表
                return   "agent信息為: {}".format(msg)
        except Exception  as e:
            log.error("注冊客戶端信息錯誤為:{}".format(e))
            return  "Bad  Request...."

    def add_task(self,task):   # 此處用于在webserver 端創(chuàng)建的agent調(diào)用方法返回結(jié)果
        task['task_id']=uuid.uuid4().hex  # 用于生成相關(guān)的任務(wù)id
        return   self.store.add_task(task)  # 此處用于調(diào)用相關(guān)配置
    def get_agents(self):
        return  self.store.get_agents()

6 task 任務(wù)分派

1 任務(wù)分派方式

任務(wù)在Storage中存儲,一旦有了任務(wù),需要將任務(wù)分派到指定節(jié)點(diǎn)執(zhí)行,交給這些節(jié)點(diǎn)上的agent
不過,目前使用zerorpc,master是被動的接受agent端的數(shù)據(jù)并進(jìn)行相關(guān)的響應(yīng)操作,所以可以考慮使用一種agent端主動拉取數(shù)據(jù)的機(jī)制,提供一個接口,讓agent訪問,如果agent處于空閑狀態(tài),則就主動拉取任務(wù),有任務(wù)就領(lǐng)走。
當(dāng)agent少的時候,master推送任務(wù)到agent端,或者agent端主動拉取任務(wù)都是可以的,但是如果考慮到agent多的時候,或許使用agent拉模式是一個更好的選擇。

本次采用agent拉取模式實(shí)現(xiàn),所以master就不需要設(shè)計(jì)調(diào)度器了

2 客戶端配置狀態(tài)參數(shù)

agent/state.py

#!/usr/bin/poython3.6
#conding:utf-8

WAITING='WAITING'
RUNNING='RUNNING'
SUCCEED='SUCCEED'
FAILED='FAILED'

3 客戶端添加消息類型result

用于返回至server端,用于最后返回至web瀏覽器端

#!/usr/bin/poython3.6
#conding:utf-8
import  socket
import  uuid
import   netifaces
import  ipaddress
import  os
class Messgae:
    def  __init__(self,myidpath):
        if os.path.exists(myidpath):  # 如果存在
            with  open(myidpath)  as  f:
                self.id=f.readline().strip()
        else:
            self.id=uuid.uuid4().hex
            with open(myidpath,'w')  as f:
                f.write(self.id)

    def get_ipaddress(self):
        address=[]
        for p in  netifaces.interfaces():  # 獲取網(wǎng)口列表
            n=netifaces.ifaddresses(p)  # 獲取字典
            if  n.get(2):  # 查看是否存在ipv4地址
                for  ip  in  n[2]:  # 此處獲取對應(yīng)列表的值
                    if  ip['addr']: # 查看ip地址是否存在
                        ip=ipaddress.ip_address(ip['addr'])
                        if    ip.is_reserved  or ip.is_multicast  or ip.is_link_local or ip.is_loopback:
                            continue
                        address.append(str(ip))
        return  address

    def  hearbeat(self):
        return   {
            "type" :"hearbeat",
            "payload" :{
                "ip"  : self.get_ipaddress(),
                "hostname" : socket.gethostname(),
                "id" : self.id
            }
        }
    def  reg(self):
        return   {
            "type" :"register",
            "payload" :{
                "ip"  : self.get_ipaddress(),
                "hostname" : socket.gethostname(),
                "id" : self.id
            }
        }
    def result(self,task_id,code,output):  # 返回?cái)?shù)據(jù)至web端,處理相關(guān)數(shù)據(jù)執(zhí)行結(jié)果的返回
        return  {
            "type" :"result",
            "payload" :{
                "id"  : task_id,  # 此處用于定義task_id 及任務(wù)id  
                "agent_id" :self.id,  # 此處用于獲取客戶端id  
                "code" : code,  # 此處用于對執(zhí)行結(jié)果狀態(tài)進(jìn)行保存
                "output" : output  #此處用于對執(zhí)行結(jié)果的輸出信息進(jìn)行保存,并進(jìn)行相關(guān)配置
            }
        }

4 agent/cm.py模塊

用于處理配置拉取相關(guān)事宜

#!/usr/bin/poython3.6
#conding:utf-8
import   zerorpc   #添加模塊
import  threading  # 用于處理中斷相關(guān)
from  .msg import  Messgae  # 獲取消息
from  .state import  *
from  .config import  CONN_URL
from  .executor import   Executor
from  utils  import getlogger

class  Conn_Manager:
    def __init__(self,timeout=3):
        self.timeout=timeout
        self.client=zerorpc.Client()
        self.event=threading.Event()
        self.message=Messgae('/var/log/mschedule/uuid')  # 此處用于初始化消息
        self.log=getlogger('agent')  # 此處填寫相關(guān)的log日志名稱
        self.state=WAITING
        self.exec=Executor()
    def start(self):
        try:
            self.event.clear()
            self.client.connect(CONN_URL)  # 鏈接處理
            self.log.info('注冊消息發(fā)送 {}'.format(self.client.send(self.message.reg())))  # 發(fā)送心跳信息
            self.client.send(self.message.reg())  #處理注冊消息
            while  not self.event.wait(self.timeout):  # 等待的時間
                self.log.info('心跳消息發(fā)送 {}'.format(self.client.send(self.message.hearbeat())))  # 發(fā)送心跳信息
                task=self.client.get_task(self.message.id)  # 此處返回三個參數(shù),1 為taskid,二是script ,三是timeout
                if task:
                    code,output=self.exec.run(task[1],task[2])
                    self.client.send(self.message.result(task[0],code,output))
                else:
                    return   "目前無消息"
        except  Exception as e:
            self.event.set()
            raise  e  # 此處是拋出異常到上一級
    def shutdown(self):
        self.log.info("關(guān)閉操作")
        self.client.close()
        self.event.set()

4 服務(wù)端相關(guān)task獲取配置

master/storage.py 用于配置獲取agent_id和task相關(guān)信息

#!/usr/bin/poython3.6
#conding:utf-8

import  datetime
from  .task import  Task
from  .state import   *
class Storage:
    def  __init__(self):
        self.agents={}  # 此處用于存儲用戶信息
        self.tasks={}  # 此處用于存儲作業(yè)信息
    def reg_hb(self,agent_id,info):  # id 及就是客戶端的id ,info 及就是host和ip地址
        self.agents[agent_id] = {
            'heaerbeat' : datetime.datetime.now(),
            'info' :info,
            'busy':self.agents.get(agent_id,{}).get('busy',False)
        }
        # busy 讀不到置False,讀到了不變
    def  get_agents(self):
        return   self.agents

    def add_task(self,task:dict):  # 此處用于從客戶端獲取相關(guān)的數(shù)據(jù)
        t=Task(**task)  # 此處進(jìn)行參數(shù)解構(gòu)
        self.tasks[t.id]=t
        return  t.id  # 此處用于獲取處理id
    @property
    def itme_task(self):
        yield  from  (task  for  task  in  self.tasks.values())  # 此處返回task
    def get_task(self,agent_id):
            return   [task.id,task.script,task.timeout]

master/handler.py 配置如下

#!/usr/bin/poython3.6
#conding:utf-8
from   utils  import  getlogger
from  .storage import  Storage
import  uuid

log=getlogger('handler')

class Handler(object):
    def __init__(self):
        self.store=Storage()
    def send(self,msg):  # 定義一個可調(diào)用的基礎(chǔ)函數(shù),此處的msg及就是對應(yīng)的函數(shù)
        log.info('客戶端agent發(fā)送消息為:{}'.format(msg))
        try:
            if  msg['type']  in  {'hearbeat','register'}:
                payload=msg['payload']
                info={'hostname' :payload['hostname'],'ip' :payload['ip']}
                self.store.reg_hb(payload['id'],info)
                log.info("客戶端數(shù)據(jù)列表為:{}".format(self.store.agents))  # 客戶端的列表
                return   "agent信息為: {}".format(msg)
        except Exception  as e:
            log.error("注冊客戶端信息錯誤為:{}".format(e))
            return  "Bad  Request...."

    def add_task(self,task):   # 此處用于在webserver 端創(chuàng)建的agent調(diào)用方法返回結(jié)果
        task['task_id']=uuid.uuid4().hex  # 用于生成相關(guān)的任務(wù)id
        return   self.store.add_task(task)  # 此處用于調(diào)用相關(guān)配置
    def get_agents(self):
        return  self.store.get_agents()
    def get_task(self,agent_id):
        return  self.store.get_task(agent_id)

5 處理服務(wù)端接受result 消息處理機(jī)制

master/handler.py中配置

#!/usr/bin/poython3.6
#conding:utf-8
from   utils  import  getlogger
from  .storage import  Storage
import  uuid

log=getlogger('handler')

class Handler(object):
    def __init__(self):
        self.store=Storage()
    def send(self,msg):  # 定義一個可調(diào)用的基礎(chǔ)函數(shù),此處的msg及就是對應(yīng)的函數(shù)
        log.info('客戶端agent發(fā)送消息為:{}'.format(msg))
        try:
            if  msg['type']  in  {'hearbeat','register'}:
                payload=msg['payload']
                info={'hostname' :payload['hostname'],'ip' :payload['ip']}
                self.store.reg_hb(payload['id'],info)
                log.info("客戶端數(shù)據(jù)列表為:{}".format(self.store.agents))  # 客戶端的列表
                return   "agent信息為: {}".format(msg)
            elif  msg['type']=="result":  # 此處用于處理相關(guān)返回信息
                self.store.result(msg['payload'])  # 調(diào)用對應(yīng)方法 
        except Exception  as e:
            log.error("注冊客戶端信息錯誤為:{}".format(e))
            return  "Bad  Request...."
    def add_task(self,task):   # 此處用于在webserver 端創(chuàng)建的agent調(diào)用方法返回結(jié)果
        task['task_id']=uuid.uuid4().hex  # 用于生成相關(guān)的任務(wù)id
        return   self.store.add_task(task)  # 此處用于調(diào)用相關(guān)配置
    def get_agents(self):
        return  self.store.get_agents()
    def get_task(self,agent_id):
        return  self.store.get_task(agent_id)
    def get_result(self,task_id):  # 此處返回對應(yīng)的值
        return  self.store.get_result(task_id)

master/stroage.py端配置

#!/usr/bin/poython3.6
#conding:utf-8

import  datetime
from  .task import  Task
from  .state import   *
class Storage:
    def  __init__(self):
        self.agents={}  # 此處用于存儲用戶信息
        self.tasks={}  # 此處用于存儲作業(yè)信息
        self.result={}  # 用于存儲agent端返回的結(jié)果
    def reg_hb(self,agent_id,info):  # id 及就是客戶端的id ,info 及就是host和ip地址
        self.agents[agent_id] = {
            'heaerbeat' : datetime.datetime.now().timestamp(),
            'info' :info,
            'busy':self.agents.get(agent_id,{}).get('busy',False)
        }
        # busy 讀不到置False,讀到了不變
    def  get_agents(self):
        return   self.agents
    def add_task(self,task:dict):  # 此處用于從客戶端獲取相關(guān)的數(shù)據(jù)
        t=Task(**task)  # 此處進(jìn)行參數(shù)解構(gòu)
        self.tasks[t.id]=t
        return  t.id  # 此處用于獲取處理id
    @property
    def itme_task(self):
        yield  from  (task  for  task  in  self.tasks.values())  # 此處返回task
    def get_task(self,agent_id):
        for  task  in  self.itme_task:
            if agent_id  in  task.targets:  # 此處用于判斷當(dāng)前節(jié)點(diǎn)接入任務(wù)情況
                return   [task.id,task.script,task.timeout]
    def add_result(self,payload:dict):
        self.result[payload['id']]=payload  # 此處以task_id 為鍵,以payload為值進(jìn)行處理
    def get_result(self,task_id:dict):
        return self.result.get(task_id['task_id'])  # task_id,獲取對應(yīng)的payload值

五 web端配置和處理

1 概述

用戶通過WEB(HTTP)提交新的任務(wù),任務(wù)json信息有:
1 任務(wù)腳本script,base64編碼
2 超時時間timeout
3 并行度 parallel
4 失敗率 fail_rate
5 失敗次數(shù) fail_count
6 targets 是跑在agent上的agent_id 列表,可以讓用戶看到一個列表,通過列表的勾選來完成相關(guān)的操作

2 代碼實(shí)現(xiàn)

根目錄創(chuàng)建appwebserver.py配置

簡單任務(wù)調(diào)度系統(tǒng)

1 獲取agent相關(guān)列表

#!/usr/bin/poython3.6
#conding:utf-8
import  zerorpc
from  aiohttp  import  request,web_response,web,log
CONN_URL="tcp://127.0.0.1:9000"

client=zerorpc.Client()
client.connect(CONN_URL)
async  def  targetshandler(request:web.Request):
    txt=client.get_agents()  #通過zerorpc調(diào)用master端接口
    return  web.json_response(txt)  # 返回json端數(shù)據(jù)

app=web.Application()

app.router.add_get('/task/targets',targetshandler)  # 使用get方法進(jìn)行處理

2 提交任務(wù)端配置

1 客戶端數(shù)據(jù)如下
{
    "script"  : "echo  hello",
    "timeout" :20,
    "targets"  :[]
}
2 添加提交數(shù)據(jù)接口
async def  taskhandler(request:web.Request):
    j = await  request.json()  # 獲取post 提交的數(shù)據(jù),用于task任務(wù)數(shù)據(jù)生成
    txt=client.add_task(j)
    return   web.Response(text=txt,status=201)

app.router.add_post('/task',taskhandler)

3 添加獲取執(zhí)行結(jié)果配置

async   def  taskresult(request:web.Request):
    j = await  request.json()
    txt =client.get_result(j)
    return   web.json_response(txt)

app.router.add_post('/result',taskresult)

4 整體代碼如下

#!/usr/bin/poython3.6
#conding:utf-8
import  zerorpc
from  aiohttp  import  request,web_response,web,log
CONN_URL="tcp://127.0.0.1:9000"

client=zerorpc.Client()
client.connect(CONN_URL)
async  def  targetshandler(request:web.Request):
    txt=client.get_agents()  #通過zerorpc調(diào)用master端接口
    return  web.json_response(txt)  # 返回json端數(shù)據(jù)

app=web.Application()

app.router.add_get('/task/targets',targetshandler)  # 使用get方法進(jìn)行處理

async def  taskhandler(request:web.Request):
    j = await  request.json()
    txt=client.add_task(j)
    return   web.Response(text=txt,status=201)

app.router.add_post('/task',taskhandler)

async   def  taskresult(request:web.Request):
    j = await  request.json()
    txt =client.get_result(j)
    return   web.json_response(txt)

app.router.add_post('/result',taskresult)

if __name__ == "__main__":
    web.run_app(app,host='0.0.0.0',port=80)

3 測試結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

六 處理數(shù)據(jù)和節(jié)點(diǎn)狀態(tài)

1 狀態(tài)管理類型

1 節(jié)點(diǎn)狀態(tài)

當(dāng)節(jié)點(diǎn)在進(jìn)行相關(guān)事件調(diào)度處理時,其狀態(tài)應(yīng)該是RUNNING狀態(tài),當(dāng)處理完成后,其狀態(tài)應(yīng)該恢復(fù)稱為WAITING狀態(tài)。

2 task 任務(wù)狀態(tài)

當(dāng)當(dāng)前agent下的所有該任務(wù)都執(zhí)行完成時的狀態(tài),此處設(shè)計(jì)較為簡單,只是全部執(zhí)行就將其狀態(tài)置位成功,否則為RUNNING狀態(tài)或者WAITING,當(dāng)有一個agent領(lǐng)取任務(wù)時,其狀態(tài)將被置為RUNNING。

3 task中對應(yīng)的agent的狀態(tài)

及就是當(dāng)前節(jié)點(diǎn)執(zhí)行當(dāng)前任務(wù)的狀態(tài),此狀態(tài)保存在task中的targets字典中,用于對其客戶端執(zhí)行結(jié)果進(jìn)行判斷而獲取其對應(yīng)狀態(tài)。

2 客戶端調(diào)整代碼

主要是cm.py調(diào)整如下

#!/usr/bin/poython3.6
#conding:utf-8
import   zerorpc   #添加模塊
import  threading  # 用于處理中斷相關(guān)
from  .msg import  Messgae  # 獲取消息
from  .state import  *
from  .config import  CONN_URL
from  .executor import   Executor
from  utils  import getlogger

class  Conn_Manager:
    def __init__(self,timeout=3):
        self.timeout=timeout
        self.client=zerorpc.Client()
        self.event=threading.Event()
        self.message=Messgae('/var/log/mschedule/uuid')  # 此處用于初始化消息
        self.log=getlogger('agent')  # 此處填寫相關(guān)的log日志名稱
        self.state=WAITING
        self.exec=Executor()
    def start(self):
        try:
            self.event.clear()
            self.client.connect(CONN_URL)  # 鏈接處理
            self.log.info('注冊消息發(fā)送 {}'.format(self.client.send(self.message.reg())))  # 發(fā)送心跳信息
            self.client.send(self.message.reg())  #處理注冊消息
            while  not self.event.wait(self.timeout):  # 等待的時間
                self.log.info('心跳消息發(fā)送 {}'.format(self.client.send(self.message.hearbeat())))  # 發(fā)送心跳信息
                if  self.state == WAITING:  # 如果此處是空閑狀態(tài),則進(jìn)行領(lǐng)任務(wù)處理
                    print('獲取任務(wù)task')
                    task = self.client.get_task(self.message.id)  # 此處返回三個參數(shù),1 為taskid,二是script ,三是timeout
                    if task:  # 領(lǐng)取成功,則進(jìn)行執(zhí)行相關(guān)任務(wù).并上傳至服務(wù)器端其狀態(tài)
                        self.state = RUNNING  # 此處任務(wù)成功的情況
                        code,output=self.exec.run(task[1],task[2])
                        self.client.send(self.message.result(task[0], code, output))
                        self.state=WAITING  #狀態(tài)更新為當(dāng)前正常狀態(tài)
                    else:
                        return   "目前無消息"
        except  Exception as e:
            self.event.set()
            raise  e  # 此處是拋出異常到上一級
    def shutdown(self):
        self.log.info("關(guān)閉操作")
        self.client.close()
        self.event.set()

3 master端代碼調(diào)整

master/storage.py

#!/usr/bin/poython3.6
#conding:utf-8

import  datetime
from  .task import  Task
from  .state import   *
from utils  import  getlogger
log=getlogger('storage')
class Storage:
    def  __init__(self):
        self.agents={}  # 此處用于存儲用戶信息
        self.tasks={}  # 此處用于存儲作業(yè)信息
        self.result={}  # 用于存儲agent端返回的結(jié)果
        self.task_state=0 # 用于處理當(dāng)所有agent狀態(tài)都修改為成功或失敗時將task的狀態(tài)也進(jìn)行相關(guān)的修改
    def reg_hb(self,agent_id,info):  # id 及就是客戶端的id ,info 及就是host和ip地址
        self.agents[agent_id] = {
            'heaerbeat' : datetime.datetime.now().timestamp(),
            'info' :info,
            'busy':self.agents.get(agent_id,{}).get('busy',False)
        }
        # busy 讀不到置False,讀到了不變
    def  get_agents(self):
        return   self.agents
    def add_task(self,task:dict):  # 此處用于從客戶端獲取相關(guān)的數(shù)據(jù)
        t=Task(**task)  # 此處進(jìn)行參數(shù)解構(gòu)
        self.tasks[t.id]=t
        return  t.id  # 此處用于獲取處理id
    @property
    def itme_task(self):
        yield  from  (task  for  task  in  self.tasks.values()  if  task.state  in  {WAITING,RUNNING})  # 此處返回task,當(dāng)其中有成功或者失敗時,則不用進(jìn)行相關(guān)的操作處理
        #當(dāng)為WAITING或者RUNNING 時,則進(jìn)行相關(guān)的操作,其他情況則不進(jìn)行相關(guān)操作
    def get_task(self,agent_id):
        for  task  in  self.itme_task:
            if agent_id  in  task.targets:  # 此處用于判斷當(dāng)前節(jié)點(diǎn)接入任務(wù)情況
                if task.state==WAITING:
                    task.state=RUNNING  #當(dāng)前消息的狀態(tài)
                task.targets[agent_id]['state']=RUNNING  # 此處是指此消息中的agent是否執(zhí)行的狀態(tài)的處理,若獲取了,則此處的狀態(tài)為RUNNING
                return   [task.id,task.script,task.timeout]
    def add_result(self,payload:dict):
        for task in self.itme_task:
            if  payload['code']==0:
                task.targets[payload['agent_id']]['state']=SUCCEED  # 此處是指對此消息進(jìn)行處理,若code=0,則表示客戶端執(zhí)行成功,若為1,則表示失敗
                self.task_state+=1
            else:
                task.targets[payload['agent_id']]['state']= FAILED#
                self.task_state+=1
            if self.task_state==task.target_count:
                task.state=SUCCEED
                self.task_state=0
            payload['agent_state']=task.targets[payload['agent_id']]['state']
        log.info("當(dāng)前消息內(nèi)容為:{}".format(self.result))
        self.result[payload['id']]=payload  # 此處以task_id 為鍵,以payload為值進(jìn)行處理
    def get_result(self,task_id:dict):
        task_id=task_id['task_id']
        return self.result.get(task_id)  # task_id,獲取對應(yīng)的payload值

4 webserver端代碼調(diào)整如下

webappserver.py

#!/usr/bin/poython3.6
#conding:utf-8
import  zerorpc
from  aiohttp  import  request,web_response,web,log
CONN_URL="tcp://127.0.0.1:9000"

client=zerorpc.Client()
client.connect(CONN_URL)
async  def  targetshandler(request:web.Request):
    txt=client.get_agents()  #通過zerorpc調(diào)用master端接口
    return  web.json_response(txt)  # 返回json端數(shù)據(jù)

app=web.Application()

app.router.add_get('/task/targets',targetshandler)  # 使用get方法進(jìn)行處理

async def  taskhandler(request:web.Request):
    j = await  request.json()
    txt=client.add_task(j)
    return   web.Response(text=txt,status=201)

app.router.add_post('/task',taskhandler)

async   def  taskresult(request:web.Request):
    j = await  request.json()
    txt =client.get_result(j)
    if txt['code']  !=0:
        txt['output']='參數(shù)不正確,請重新輸入'
    return   web.json_response(txt)

app.router.add_post('/result',taskresult)

if __name__ == "__main__":
    web.run_app(app,host='0.0.0.0',port=80)

5 結(jié)果如下

簡單任務(wù)調(diào)度系統(tǒng)

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI