ABC学习网

python注册钉钉回调事件的实现

热度:5℃发布时间:2023-12-29 11:58:55
目录1、注册端2、回调端:以下示例代码为python2,django 框架

钉钉API文档:https://ding-doc.dingtalk.com/doc#/serverapi2/skn8ld

钉钉有回调事件流程,有哪些回调?比如:通讯录回调、审批回调等等,拿通讯录回调来说,就是当你公司组织架构发生变动时,会自动触发你自己注册的回调地址,然后根据回调信息做一些自定义的处理,不得不说,钉钉真的是解决了协同办公的很多问题,非常nice,但就回调事件来说,每个企业只能注册一个回调地址,即使你要监听的是不同的事件、即使还有其他业务线需要用到回调,也只能不多于一个回调,当然这都是出于人家服务多方面考虑的设计,废话不多说,

好,流程:

一个注册端向钉钉发送注册请求,钉钉callback你自己的服务url,必须是公网访问的地址,并且该地址返回钉钉json数据来告诉钉钉回调成功

注册成功之后这个地址就可以接收钉钉的回调post请求,做一些自定义处理,记得返回json

好,代码

1、注册端

mport requests, jsonclass DingDingCallBack(object): def __init__(self): self.appsecret=’’ self.appkey=’’ self.api_url = "https://oapi.dingtalk.com/gettoken?appkey=%s&appsecret=%s"%(self.appkey,self.appsecret) self.aes_key = "" self.callbackUrl = "回调url" self.headers = { ’Content-Type’: ’application/json’, ’User-Agent’: ’Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.96 Safari/537.36’, ’username’:’zhangsan’, ’password’:’123456’ } def get_token(self): res = requests.get(self.api_url) if res.status_code == 200: str_res = res.text token = (json.loads(str_res)).get(’access_token’) return token def regist_call_back(self): url = ’https://oapi.dingtalk.com/call_back/register_call_back?access_token={ACCESS_TOKEN}’.format(ACCESS_TOKEN=self.get_token()) data = { "call_back_tag": ["bpms_task_change", "bpms_instance_change"], # 审批回调 "token": "qxN3cm", "aes_key": self.aes_key, "url":self.callbackUrl, } data1 = json.dumps(data) response = requests.post(url, headers=self.headers, data=data1) print(response) print(response.text) def query_callback(self): url = ’https://oapi.dingtalk.com/call_back/get_call_back?access_token={ACCESS_TOKEN}’.format(ACCESS_TOKEN=self.get_token()) response = requests.get(url, headers=self.headers, ) print(response) print(response.text) def update_call_back(self): url = ’https://oapi.dingtalk.com/call_back/update_call_back?access_token={}’.format(self.get_token()) data = { "call_back_tag": ["bpms_task_change", "bpms_instance_change","user_add_org","user_leave_org","org_dept_create","org_dept_modify","org_dept_remove"], "token": "自定义字符串", "aes_key": self.aes_key, "url":self.callbackUrl } data1 = json.dumps(data) response = requests.post(url, headers=self.headers, data=data1) print(response) print(response.text) def get_fail(self): url = ’https://oapi.dingtalk.com/call_back/get_call_back_failed_result?access_token={}’.format(self.get_token()) response = requests.get(url, headers=self.headers, ) print(response.text)# todo 删除回调 def delete_callback(self): url = ’https://oapi.dingtalk.com/call_back/delete_call_back?access_token={ACCESS_TOKEN}’.format(ACCESS_TOKEN=self.get_token()) result = requests.get(url) print(result) print(result.text)#if __name__ == ’__main__’: dingOBJ = DingDingCallBack() # todo 获取钉钉token# # dingOBJ.get_token() # todo 获取回调失败# dingOBJ.get_fail() # todo 注册回调 # dingOBJ.regist_call_back() # todo 查询回调事件 dingOBJ.query_callback() # todo 修改回调事件# dingOBJ.update_call_back() # todo 删除回调 # dingOBJ.delete_callback()

2、回调端:以下示例代码为python2,django 框架

# -*- coding:utf-8 -*-from django.shortcuts import renderfrom django.http import JsonResponse,HttpResponsefrom django.views.generic import Viewfrom django.views.decorators.csrf import csrf_exemptfrom DingCrypto import DingCryptoimport random,string,time,json,requests,simplejson# Create your views here.from ast import literal_evalencodingAesKey = ’’key = ’’token = ’自定义字符串’appsecret = ’’appkey = ’’api_url = "https://oapi.dingtalk.com/gettoken?appkey=%s&appsecret=%s"%(appkey,appsecret)def randam_string(n): ran_str = ’’.join(random.sample(string.ascii_letters + string.digits, n)) return ran_str# 注册回调@csrf_exemptdef workOrderCallback(request): if request.method == ’GET’: return JsonResponse({’code’:200,’msg’:’ok’}) if request.method == ’POST’: print request.GET dingCrypto = DingCrypto(encodingAesKey,key) nonce = randam_string(8) timestamp = str(int(round(time.time()))) encrpyt = dingCrypto.encrypt(’success’) # print nonce,timestamp,token,encrpyt signature = dingCrypto.generateSignature(nonce=nonce,timestamp=timestamp,token=token,msg_encrypt=encrpyt) new_data = { ’data’: { ’msg_signature’: signature, ’timeStamp’: timestamp, ’nonce’: nonce, ’encrypt’: encrpyt }, } encrpyt11 = dingCrypto.decrypt(encrpyt) return JsonResponse(new_data.get(’data’))# 响应回调class CallBack(View): def get(self,request,*args,**kwargs): return JsonResponse({’code’:200,’msg’:’ok’}) def get_token(self): res = requests.get(api_url) if res.status_code == 200: str_res = res.text token = (json.loads(str_res)).get(’access_token’) return token def post(self,request,*args,**kwargs): # data = request.GET dingCrypto = DingCrypto(encodingAesKey, key) nonce = randam_string(8) timestamp = str(int(round(time.time()))) encrpyt = dingCrypto.encrypt(’success’) # signature = dingCrypto.generateSignature(nonce=nonce,timestamp=timestamp,token=token,msg_encrypt=encrpyt) callback = json.loads(dingCrypto.decrypt(json.loads(request.body).get(’encrypt’))) if callback[’EventType’] == ’bpms_instance_change’: # 审批实例开始,结束 url = ’https://oapi.dingtalk.com/topapi/processinstance/get?access_token={ACCESS_TOKEN}’.format(ACCESS_TOKEN=self.get_token()) instace_ = { "process_instance_id": callback[’processInstanceId’] } data2 = json.dumps(instace_) req = requests.post(url,data=data2) data = literal_eval(str(req.text)).get(’process_instance’) excute_workorder(callback[’processInstanceId’],data) elif callback[’EventType’] == ’bpms_task_change’: # 审批任务开始,结束,转交 print ’bpms_task_change’ elif callback[’EventType’] == ’user_add_org’: print ’用户增加’elif callback[’EventType’] == ’user_leave_org’: print ’用户离职’ elif callback[’EventType’] == ’org_dept_create’: print ’组织架构添加’elif callback[’EventType’] == ’org_dept_modify’: print ’组织架构变更’elif callback[’EventType’] == ’org_dept_remove’: print ’组织架构删除’return HttpResponse(encrpyt)

到此这篇关于python注册钉钉回调事件的实现的文章就介绍到这了,更多相关python注册钉钉回调事件内容请搜索ABC学习网以前的文章或继续浏览下面的相关文章希望大家以后多多支持ABC学习网!

(window.slotbydup = window.slotbydup || []).push({id: "u6915441",container: "_5rmj5io5v3i",async: true});
网友评论
评论
发 布

更多编程教程
  • 编程教程推荐
更多+