|
| 1 | +#!/usr/bin/python |
| 2 | +# encoding: utf-8 |
| 3 | +# -*- coding: utf8 -*- |
| 4 | +""" |
| 5 | +Created by PyCharm. |
| 6 | +File: LinuxBashShellScriptForOps:sendMessageByWeChatWorkApi.py |
| 7 | +User: Guodong |
| 8 | +Create Date: 2017/9/25 |
| 9 | +Create Time: 19:00 |
| 10 | +Description: Refactoring WeChat Message Sender by Python without SQL database |
| 11 | + SQL is not required, using filesystem persistent storage as database, |
| 12 | + data are stored in pickled objects, fetch data as access a python dict rather than a SQL database. |
| 13 | + So far, I did not find the limits about message count can be sent on wechat official website. |
| 14 | +References: |
| 15 | + [微信公众平台-企业号开发者中心-接口文档](http://qydev.weixin.qq.com/wiki/index.php?title=%E9%A6%96%E9%A1%B5) |
| 16 | + [发送接口说明](http://qydev.weixin.qq.com/wiki/index.php?title=%E5%8F%91%E9%80%81%E6%8E%A5%E5%8F%A3%E8%AF%B4%E6%98%8E) |
| 17 | + update@202104 |
| 18 | + [企业微信发送应用消息](https://work.weixin.qq.com/api/doc/90000/90135/90236) |
| 19 | +Prerequisites: [six, shelve, json, requests] |
| 20 | + """ |
| 21 | +import datetime |
| 22 | +import json |
| 23 | +import os |
| 24 | +import shelve |
| 25 | +import sys |
| 26 | + |
| 27 | +import requests |
| 28 | +import six |
| 29 | + |
| 30 | + |
| 31 | +class WeChatMessageSender(object): |
| 32 | + def __init__(self, cid, secret): |
| 33 | + self.corpid = cid |
| 34 | + self.corpsecret = secret |
| 35 | + |
| 36 | + # please set this value into abs path, such as `/usr/lib/zabbix/alertscripts/wechat.db` when using zabbix. |
| 37 | + self.database = 'wechat.db' |
| 38 | + |
| 39 | + self.requested_time = None |
| 40 | + |
| 41 | + self.valid_data = self.load_data() |
| 42 | + self.access_token = self.valid_data['access_token'] |
| 43 | + |
| 44 | + self.message_data = dict() |
| 45 | + |
| 46 | + def get_token(self): |
| 47 | + url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken" |
| 48 | + |
| 49 | + querystring = { |
| 50 | + "corpid": self.corpid, |
| 51 | + "corpsecret": self.corpsecret |
| 52 | + } |
| 53 | + |
| 54 | + headers = { |
| 55 | + 'cache-control': "no-cache", |
| 56 | + } |
| 57 | + |
| 58 | + response = requests.request("GET", url, headers=headers, params=querystring) |
| 59 | + |
| 60 | + self.requested_time = datetime.datetime.now() |
| 61 | + |
| 62 | + response_data = response.text |
| 63 | + |
| 64 | + if response_data: |
| 65 | + return response_data |
| 66 | + else: |
| 67 | + return None |
| 68 | + |
| 69 | + def parse_token(self): |
| 70 | + data_type_json = self.get_token() |
| 71 | + if data_type_json != "" and data_type_json is not None: |
| 72 | + data_type_dict = json.loads(data_type_json) |
| 73 | + |
| 74 | + if data_type_dict['errcode'] == 0 or data_type_dict['errmsg'] == 'ok': |
| 75 | + return data_type_dict |
| 76 | + else: |
| 77 | + return None |
| 78 | + |
| 79 | + def retrieve_data(self): |
| 80 | + persistent_object = shelve.open(self.database) |
| 81 | + data = self.parse_token() |
| 82 | + if data is not None: |
| 83 | + persistent_object['access_token'] = data['access_token'] |
| 84 | + persistent_object['expires_in'] = data['expires_in'] |
| 85 | + persistent_object['requested_time'] = self.requested_time |
| 86 | + expire_time = self.requested_time + datetime.timedelta(seconds=data['expires_in']) |
| 87 | + persistent_object['expires_on'] = str(expire_time) |
| 88 | + persistent_object['is_expired'] = False |
| 89 | + |
| 90 | + self.valid_data = dict(persistent_object) |
| 91 | + persistent_object.close() |
| 92 | + |
| 93 | + def validate_data(self): |
| 94 | + if not os.path.exists(self.database): |
| 95 | + self.retrieve_data() |
| 96 | + |
| 97 | + persistent_object = shelve.open(self.database) |
| 98 | + data = dict(persistent_object) |
| 99 | + persistent_object.close() |
| 100 | + |
| 101 | + expire_time = datetime.datetime.strptime(data['expires_on'], '%Y-%m-%d %H:%M:%S.%f') |
| 102 | + now_time = datetime.datetime.now() |
| 103 | + if now_time > expire_time: |
| 104 | + self.retrieve_data() |
| 105 | + |
| 106 | + def load_data(self): |
| 107 | + self.validate_data() |
| 108 | + |
| 109 | + persistent_object = shelve.open(self.database) |
| 110 | + data = dict(persistent_object) |
| 111 | + persistent_object.close() |
| 112 | + return data |
| 113 | + |
| 114 | + def get_access_token(self): |
| 115 | + # test purpose |
| 116 | + return self.valid_data |
| 117 | + |
| 118 | + def reset_data(self): |
| 119 | + # test purpose for further use |
| 120 | + if os.path.exists(self.database): |
| 121 | + os.remove(self.database) |
| 122 | + self.get_access_token() |
| 123 | + |
| 124 | + def sender_config(self, content, to_user="", to_party="", to_tag="", msg_type="text", agent_id=1, safe=1): |
| 125 | + """ |
| 126 | +
|
| 127 | + :param content: |
| 128 | + text: 消息内容,最长不超过2048个字节,注意:主页型应用推送的文本消息在微信端最多只显示20个字(包含中英文) |
| 129 | + :type content: |
| 130 | + :param to_user: 成员名称使用该成员在企业微信中的账号名(并非显示名称),账号名可以在企业微信后台查到 |
| 131 | + 成员ID列表(消息接收者,多个接收者用‘|’分隔,最多支持1000个)。 |
| 132 | + 特殊情况:指定为@all,则向关注该企业应用的全部成员发送 |
| 133 | + :type to_user: str |
| 134 | + :param to_party:部门ID列表,多个接收者用‘|’分隔,最多支持100个。当touser为@all时忽略本参数 |
| 135 | + :type to_party: |
| 136 | + :param to_tag:标签ID列表,多个接收者用‘|’分隔,最多支持100个。当touser为@all时忽略本参数 |
| 137 | + :type to_tag: |
| 138 | + :param msg_type:消息类型,此时固定为:text (支持消息型应用跟主页型应用), 支持news, file, voice |
| 139 | + :type msg_type: |
| 140 | + :param agent_id:企业应用的id,整型。可在应用的设置页面查看 |
| 141 | + :type agent_id: |
| 142 | + :param safe: 表示是否是保密消息,0表示否,1表示是,默认0 |
| 143 | + :type safe: |
| 144 | + :return: |
| 145 | + :rtype: |
| 146 | + """ |
| 147 | + # parameters instruction see this URL as follows. |
| 148 | + # http://qydev.weixin.qq.com/wiki/index.php?title=%E6%B6%88%E6%81%AF%E7%B1%BB%E5%9E%8B%E5%8F%8A%E6%95%B0%E6%8D%AE%E6%A0%BC%E5%BC%8F |
| 149 | + self.message_data['touser'] = to_user |
| 150 | + self.message_data['toparty'] = to_party |
| 151 | + self.message_data['totag'] = to_tag |
| 152 | + self.message_data['msgtype'] = msg_type |
| 153 | + self.message_data['agentid'] = agent_id |
| 154 | + if msg_type == "text": |
| 155 | + self.message_data['text'] = dict() |
| 156 | + self.message_data['text']['content'] = content |
| 157 | + elif msg_type == "markdown": # qa failed, ui not support |
| 158 | + self.message_data['markdown'] = dict() |
| 159 | + self.message_data['markdown']['content'] = content |
| 160 | + self.message_data['safe'] = safe |
| 161 | + |
| 162 | + def send(self, debug=False): |
| 163 | + """ |
| 164 | + 该方法不同于企业微信机器人,此方法是将信息通过企业微信管理后台中"应用管理"中创建的"应用", |
| 165 | + 发送给该应用中的成员,成员名称使用该成员在企业微信中的账号名(并非显示名称),账号名可以在企业微信后台查到 |
| 166 | + :return: |
| 167 | + :rtype: |
| 168 | + """ |
| 169 | + url = "https://qyapi.weixin.qq.com/cgi-bin/message/send" |
| 170 | + |
| 171 | + if debug: |
| 172 | + querystring = { |
| 173 | + "access_token": self.access_token, |
| 174 | + "debug": "1" |
| 175 | + } |
| 176 | + else: |
| 177 | + querystring = {"access_token": self.access_token} |
| 178 | + |
| 179 | + if six.PY2: |
| 180 | + payload = json.dumps(self.message_data, encoding='utf-8', ensure_ascii=False) |
| 181 | + else: |
| 182 | + payload = json.dumps(self.message_data, ensure_ascii=False) |
| 183 | + |
| 184 | + headers = { |
| 185 | + 'content-type': "application/json", |
| 186 | + 'cache-control': "no-cache", |
| 187 | + } |
| 188 | + |
| 189 | + response = requests.request("POST", url, data=payload, headers=headers, params=querystring) |
| 190 | + print(response.text) |
| 191 | + |
| 192 | + |
| 193 | +def usage(): |
| 194 | + print(""" |
| 195 | + Function: send markdown message to somebody using wechat work API |
| 196 | + Usage: python %s <username> <subject> <message body> |
| 197 | + Zabbix setting: 'Administration' -> 'Media types' |
| 198 | + https://hostname/zabbix.php?action=mediatype.edit&mediatypeid=4 |
| 199 | + Script parameters: {ALERT.SENDTO} {ALERT.SUBJECT} {ALERT.MESSAGE} |
| 200 | + Example: python %s "username" "Test email from Python" "Python rules them all!" |
| 201 | +""" % (__file__, sys.argv[0])) |
| 202 | + sys.exit(0) |
| 203 | + |
| 204 | + |
| 205 | +if __name__ == '__main__': |
| 206 | + # [企业微信管理后台](https://work.weixin.qq.com/wework_admin/) |
| 207 | + CorpID = 'your corp id' # 企业ID,可通过企业微信后台"我的企业"|"企业ID"查看 |
| 208 | + Secret = 'app secret' # 企业微信中自建企业应用的Secret |
| 209 | + AgentID = 1000010 # 企业微信中自建企业应用的AgentId |
| 210 | + # ToUser = 'DingGuodong' # 企业微信企业成员的账号名(非显示名) |
| 211 | + |
| 212 | + argv_length = len(sys.argv) |
| 213 | + if not (argv_length == 1 or argv_length == 4): |
| 214 | + print("bad call") |
| 215 | + if argv_length == 1: |
| 216 | + subject = "Test Message" |
| 217 | + message = "Test message sent by Python over Wechat Work" |
| 218 | + to_user = 'DingGuodong' # if there are more than one phone number to at, use space spilt them |
| 219 | + # at_all = False |
| 220 | + else: |
| 221 | + to_user = sys.argv[1] |
| 222 | + subject = sys.argv[2] |
| 223 | + message = sys.argv[3] |
| 224 | + if subject == "": |
| 225 | + subject = "empty subject." |
| 226 | + if message == "": |
| 227 | + message = "empty message." |
| 228 | + |
| 229 | + basic_msg_template = """ |
| 230 | +### {subject} |
| 231 | +{message} |
| 232 | +""".format(subject=subject, message=message).strip() |
| 233 | + |
| 234 | + w = WeChatMessageSender(CorpID, Secret) |
| 235 | + w.sender_config(basic_msg_template, to_user=to_user, agent_id=AgentID, msg_type="markdown", safe=0) |
| 236 | + w.send() |
0 commit comments