您好,登錄后才能下訂單哦!
這篇文章主要介紹了python如何實現(xiàn)注冊釘釘回調(diào)事件,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
1、注冊端
2、回調(diào)端:以下示例代碼為python2,django 框架
釘釘API文檔:https://ding-doc.dingtalk.com/doc#/serverapi2/skn8ld
釘釘有回調(diào)事件流程,有哪些回調(diào)?比如:通訊錄回調(diào)、審批回調(diào)等等,拿通訊錄回調(diào)來說,就是當你公司組織架構(gòu)發(fā)生變動時,會自動觸發(fā)你自己注冊的回調(diào)地址,然后根據(jù)回調(diào)信息做一些自定義的處理,不得不說,釘釘真的是解決了協(xié)同辦公的很多問題,非常nice,但就回調(diào)事件來說,每個企業(yè)只能注冊一個回調(diào)地址,即使你要監(jiān)聽的是不同的事件、即使還有其他業(yè)務(wù)線需要用到回調(diào),也只能不多于一個回調(diào),當然這都是出于人家服務(wù)多方面考慮的設(shè)計,廢話不多說,
好,流程:
一個注冊端向釘釘發(fā)送注冊請求,釘釘callback你自己的服務(wù)url,必須是公網(wǎng)訪問的地址,并且該地址返回釘釘json數(shù)據(jù)來告訴釘釘回調(diào)成功
注冊成功之后這個地址就可以接收釘釘?shù)幕卣{(diào)post請求,做一些自定義處理,記得返回json
好,代碼
mport requests, json class DingDingCallBack(object): def __init__(self): self.appsecret='' self.appkey='' self.api_url = "https://oapi.dingtalk.com/gettoken?appkey=%s&appsecret=%s"%(self.appkey,self.appsecret) self.aes_key = "" self.callbackUrl = "回調(diào)url" self.headers = { 'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.96 Safari/537.36', 'username':'zhangsan', 'password':'123456' } def get_token(self): res = requests.get(self.api_url) if res.status_code == 200: str_res = res.text token = (json.loads(str_res)).get('access_token') return token def regist_call_back(self): url = 'https://oapi.dingtalk.com/call_back/register_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token()) data = { "call_back_tag": ["bpms_task_change", "bpms_instance_change"], # 審批回調(diào) "token": "qxN3cm", "aes_key": self.aes_key, "url":self.callbackUrl, } data1 = json.dumps(data) response = requests.post(url, headers=self.headers, data=data1) print(response) print(response.text) def query_callback(self): url = 'https://oapi.dingtalk.com/call_back/get_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token()) response = requests.get(url, headers=self.headers, ) print(response) print(response.text) def update_call_back(self): url = 'https://oapi.dingtalk.com/call_back/update_call_back?access_token={}'.format(self.get_token()) data = { "call_back_tag": ["bpms_task_change", "bpms_instance_change","user_add_org","user_leave_org","org_dept_create","org_dept_modify","org_dept_remove"], "token": "自定義字符串", "aes_key": self.aes_key, "url":self.callbackUrl } data1 = json.dumps(data) response = requests.post(url, headers=self.headers, data=data1) print(response) print(response.text) def get_fail(self): url = 'https://oapi.dingtalk.com/call_back/get_call_back_failed_result?access_token={}'.format(self.get_token()) response = requests.get(url, headers=self.headers, ) print(response.text)# todo 刪除回調(diào) def delete_callback(self): url = 'https://oapi.dingtalk.com/call_back/delete_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token()) result = requests.get(url) print(result) print(result.text) # if __name__ == '__main__': dingOBJ = DingDingCallBack() # todo 獲取釘釘token # # dingOBJ.get_token() # todo 獲取回調(diào)失敗 # dingOBJ.get_fail() # todo 注冊回調(diào) # dingOBJ.regist_call_back() # todo 查詢回調(diào)事件 dingOBJ.query_callback() # todo 修改回調(diào)事件 # dingOBJ.update_call_back() # todo 刪除回調(diào) # dingOBJ.delete_callback()
# -*- coding:utf-8 -*- from django.shortcuts import render from django.http import JsonResponse,HttpResponse from django.views.generic import View from django.views.decorators.csrf import csrf_exempt from DingCrypto import DingCrypto import random,string,time,json,requests,simplejson # Create your views here.from ast import literal_eval encodingAesKey = '' key = '' token = '自定義字符串' appsecret = '' appkey = '' api_url = "https://oapi.dingtalk.com/gettoken?appkey=%s&appsecret=%s"%(appkey,appsecret) def randam_string(n): ran_str = ''.join(random.sample(string.ascii_letters + string.digits, n)) return ran_str # 注冊回調(diào) @csrf_exempt def workOrderCallback(request): if request.method == 'GET': return JsonResponse({'code':200,'msg':'ok'}) if request.method == 'POST': print request.GET dingCrypto = DingCrypto(encodingAesKey,key) nonce = randam_string(8) timestamp = str(int(round(time.time()))) encrpyt = dingCrypto.encrypt('success') # print nonce,timestamp,token,encrpyt signature = dingCrypto.generateSignature(nonce=nonce,timestamp=timestamp,token=token,msg_encrypt=encrpyt) new_data = { 'data': { 'msg_signature': signature, 'timeStamp': timestamp, 'nonce': nonce, 'encrypt': encrpyt }, } encrpyt11 = dingCrypto.decrypt(encrpyt) return JsonResponse(new_data.get('data')) # 響應(yīng)回調(diào) class CallBack(View): def get(self,request,*args,**kwargs): return JsonResponse({'code':200,'msg':'ok'}) def get_token(self): res = requests.get(api_url) if res.status_code == 200: str_res = res.text token = (json.loads(str_res)).get('access_token') return token def post(self,request,*args,**kwargs): # data = request.GET dingCrypto = DingCrypto(encodingAesKey, key) nonce = randam_string(8) timestamp = str(int(round(time.time()))) encrpyt = dingCrypto.encrypt('success') # signature = dingCrypto.generateSignature(nonce=nonce,timestamp=timestamp,token=token,msg_encrypt=encrpyt) callback = json.loads(dingCrypto.decrypt(json.loads(request.body).get('encrypt'))) if callback['EventType'] == 'bpms_instance_change': # 審批實例開始,結(jié)束 url = 'https://oapi.dingtalk.com/topapi/processinstance/get?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token()) instace_ = { "process_instance_id": callback['processInstanceId'] } data2 = json.dumps(instace_) req = requests.post(url,data=data2) data = literal_eval(str(req.text)).get('process_instance') excute_workorder(callback['processInstanceId'],data) elif callback['EventType'] == 'bpms_task_change': # 審批任務(wù)開始,結(jié)束,轉(zhuǎn)交 print 'bpms_task_change' elif callback['EventType'] == 'user_add_org': print '用戶增加'elif callback['EventType'] == 'user_leave_org': print '用戶離職' elif callback['EventType'] == 'org_dept_create': print '組織架構(gòu)添加'elif callback['EventType'] == 'org_dept_modify': print '組織架構(gòu)變更'elif callback['EventType'] == 'org_dept_remove': print '組織架構(gòu)刪除'return HttpResponse(encrpyt)
感謝你能夠認真閱讀完這篇文章,希望小編分享的“python如何實現(xiàn)注冊釘釘回調(diào)事件”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識等著你來學習!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。