Skip to content

企业微信机器人

python
import requests

def push_text(content, key=""):
    api_send = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={}".format(key)
    data = {"msgtype": "text", "text": {"content": content}}
    try:
        response = requests.post(api_send, json=data)
        assert response.json()['errmsg'] == 'ok'
    except:
        print(f"{data} - 文本消息推送失败")
python
import os
import requests
from io import BytesIO
import pyminizip
from base64 import b64encode

PASSWORD = ''
KEY = ''

class FileCompressor:
    def __init__(self, password: str):
        self.password = b64encode(password.encode()).decode()

    def read_file(self, file_path: str) -> bytes:
        with open(file_path, 'rb') as file:
            return file.read()

    def compress_content_as_zip(self, plaintext_name: str, file_content: bytes, crypto_name: str) -> BytesIO:
        with open(plaintext_name, 'wb') as temp_file:
            temp_file.write(file_content)
        
        pyminizip.compress(plaintext_name, None, crypto_name, self.password, 5)
        
        with open(crypto_name, 'rb') as encrypted_file:
            encrypted_zip_io = BytesIO(encrypted_file.read())
        
        os.remove(plaintext_name)
        os.remove(crypto_name)
        
        encrypted_zip_io.seek(0)
        return encrypted_zip_io

    def create_crypto_zip_file(self, filename: str, file_content: bytes) -> BytesIO:
        plaintext_name = f'{filename}.json'
        crypto_name = f'{filename}.zip'
        return self.compress_content_as_zip(plaintext_name, file_content, crypto_name)

class WeChatBot:
    def __init__(self, key: str):
        self.key = key

    def push_text(self, content: str):
        api_send = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={self.key}"
        data = {"msgtype": "text", "text": {"content": content}}
        try:
            response = requests.post(api_send, json=data)
            response.raise_for_status()
            if response.json().get('errmsg') != 'ok':
                raise ValueError("Failed to send text message")
        except Exception as e:
            print(f"Failed to send text message: {e}, Data: {data}")

    def push_file(self, media_id: str):
        api_send = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={self.key}"
        data = {"msgtype": "file", "file": {"media_id": media_id}}
        try:
            response = requests.post(api_send, json=data)
            response.raise_for_status()
            if response.json().get('errmsg') != 'ok':
                raise ValueError("Failed to send file")
        except Exception as e:
            print(f"Failed to send file: {e}, Data: {data}")

    def upload_media(self, filename: str, file_content: BytesIO, media_type: str):
        url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={self.key}&type={media_type}'
        files = {'media': (filename, file_content, 'application/octet-stream')}
        
        try:
            response = requests.post(url, files=files)
            response.raise_for_status()
            media_id = response.json().get('media_id')
            if media_id:
                self.push_file(media_id)
            else:
                raise ValueError("No media_id returned in response")
        except Exception as e:
            print(f"Failed to upload media: {e}")

    def upload_bot_file(self, file_content: bytes, tag: str):
        compressor = FileCompressor(PASSWORD)
        filename_zip = f'{tag}.zip'
        zip_file = compressor.create_crypto_zip_file(tag, file_content)
        self.upload_media(filename_zip, zip_file, 'file')

# Usage Example
# compressor = FileCompressor(PASSWORD)
# file_content = compressor.read_file('example.json')

# bot = WeChatBot(KEY)
# bot.upload_bot_file(file_content, 'example_tag')

群机器人配置说明

Released under the MIT License.