企业微信机器人
python
import requests
def push_text(content, key=""):
api_send = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={}".format(key)
data = {"msgtype": "text", "text": {"content": content}}
try:
response = requests.post(api_send, json=data)
assert response.json()['errmsg'] == 'ok'
except:
print(f"{data} - 文本消息推送失败")
python
import os
import requests
from io import BytesIO
import pyminizip
from base64 import b64encode
PASSWORD = ''
KEY = ''
class FileCompressor:
def __init__(self, password: str):
self.password = b64encode(password.encode()).decode()
def read_file(self, file_path: str) -> bytes:
with open(file_path, 'rb') as file:
return file.read()
def compress_content_as_zip(self, plaintext_name: str, file_content: bytes, crypto_name: str) -> BytesIO:
with open(plaintext_name, 'wb') as temp_file:
temp_file.write(file_content)
pyminizip.compress(plaintext_name, None, crypto_name, self.password, 5)
with open(crypto_name, 'rb') as encrypted_file:
encrypted_zip_io = BytesIO(encrypted_file.read())
os.remove(plaintext_name)
os.remove(crypto_name)
encrypted_zip_io.seek(0)
return encrypted_zip_io
def create_crypto_zip_file(self, filename: str, file_content: bytes) -> BytesIO:
plaintext_name = f'{filename}.json'
crypto_name = f'{filename}.zip'
return self.compress_content_as_zip(plaintext_name, file_content, crypto_name)
class WeChatBot:
def __init__(self, key: str):
self.key = key
def push_text(self, content: str):
api_send = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={self.key}"
data = {"msgtype": "text", "text": {"content": content}}
try:
response = requests.post(api_send, json=data)
response.raise_for_status()
if response.json().get('errmsg') != 'ok':
raise ValueError("Failed to send text message")
except Exception as e:
print(f"Failed to send text message: {e}, Data: {data}")
def push_file(self, media_id: str):
api_send = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={self.key}"
data = {"msgtype": "file", "file": {"media_id": media_id}}
try:
response = requests.post(api_send, json=data)
response.raise_for_status()
if response.json().get('errmsg') != 'ok':
raise ValueError("Failed to send file")
except Exception as e:
print(f"Failed to send file: {e}, Data: {data}")
def upload_media(self, filename: str, file_content: BytesIO, media_type: str):
url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={self.key}&type={media_type}'
files = {'media': (filename, file_content, 'application/octet-stream')}
try:
response = requests.post(url, files=files)
response.raise_for_status()
media_id = response.json().get('media_id')
if media_id:
self.push_file(media_id)
else:
raise ValueError("No media_id returned in response")
except Exception as e:
print(f"Failed to upload media: {e}")
def upload_bot_file(self, file_content: bytes, tag: str):
compressor = FileCompressor(PASSWORD)
filename_zip = f'{tag}.zip'
zip_file = compressor.create_crypto_zip_file(tag, file_content)
self.upload_media(filename_zip, zip_file, 'file')
# Usage Example
# compressor = FileCompressor(PASSWORD)
# file_content = compressor.read_file('example.json')
# bot = WeChatBot(KEY)
# bot.upload_bot_file(file_content, 'example_tag')