发送复审
"topic": "ks/llm/req/text"
需要将以下json格式转成MessagePack格式的字节串,进行发送
{
"msg_id": "自动生成的ID", # 通过get_object_id()生成,可自定义传入uuid
"msg_type": "llm_req", # 固定值
"time": "当前时间字符串", # 格式: YYYY-MM-DD HH:MM:SS.ssssss
"data": {
"id": "自动生成的ID", # 通过get_object_id()生成,同msg_id值
"urgent": true, # 是否紧急请求,为true则优先处理
"content": [
{
"text": "输入的文本",
"image": "base64编码的图片数据"
}
],
"reserved_data": {
"task_type": "llm_chat" # 固定值,
"任意字段":"任意值", # 响应会原封不动返回,可以传入正则表达式,从响应值直接获取进行匹配
}
}
}
接收大模型结果
"topic":"ks/llm/resp/text"
订阅ks/llm/resp/text
主题,接收MessagePack格式字节串`,下方为转换为json
{
"msg_id": "90269691-7c5a-40ca-9aba-bde81557a9e1", # 发送的msg_id
"msg_type": "llm_resp", # 固定值
"time": "2025-07-04 13:56:05.750815",
"data": {
"id": "90269691-7c5a-40ca-9aba-bde81557a9e1", # 发送的msg_id
"urgent": true,
"content": [
"大模型输出内容。"
],
"reserved_data": {
"task_type": "llm_chat",
"任意字段":"任意值", # 从请求中的参数原封不动返回,多用于正则匹配
}
}
}
Request代码示例代码
import json
import paho.mqtt.client as mqtt
import msgpack
import datetime
import uuid
import base64
import time
# 连接参数
broker = '192.168.1.88'
port = 1883
client_id = 'test_client'
# 回调函数 - 适配VERSION2
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code == 0:
print('成功连接到MQTT Broker!')
else:
print(f'连接失败,错误代码: {reason_code}')
def on_publish(client, userdata, mid, reason_code=None, properties=None):
print(f'消息已发布,消息ID: {mid}, 原因代码: {reason_code}')
# 构造消息
def generate_llm_chat_message(text, image_base64=None):
message_id = str(uuid.uuid4())
return {
'msg_id': message_id,
'msg_type': 'llm_req',
'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),
'data': {
'id': message_id,
'urgent': True,
'content': [
{
'text': text,
'image': image_base64 if image_base64 else ''
}
],
'reserved_data': {
'task_type': 'llm_chat',
'match': 'test'
}
}
}
try:
# 创建客户端
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=client_id)
# 设置回调函数
client.on_connect = on_connect
client.on_publish = on_publish
# 连接broker
print('正在尝试连接到MQTT Broker...')
client.connect(broker, port)
# 启动网络循环
client.loop_start()
# 构造消息内容
text = '''
图片中有人没带安全帽?
选项:
A. 没有
B. 有
从上述选项中,选择正确答案,只输出选项。
'''
# 读取同目录的图片并编码成base64
image_file = 'test.jpg'
try:
with open(image_file, 'rb') as f:
image_base64 = base64.b64encode(f.read()).decode('utf-8')
except FileNotFoundError:
print(f'错误: 图片文件 {image_file} 未找到!')
exit(1)
# 生成消息并序列化
msg = generate_llm_chat_message(text, image_base64)
payload = msgpack.dumps(msg)
# 发布消息
print('正在发布消息...')
result = client.publish('ks/llm/req/text', payload, qos=2)
# 等待消息发送完成
result.wait_for_publish()
# 等待足够时间确保消息发送
time.sleep(2)
print('消息发布成功,msg_id: {}'.format(msg['msg_id']))
except Exception as e:
print(f'发生错误: {str(e)}')
finally:
client.loop_stop()
client.disconnect()
print('程序结束')
Response代码示例代码
import paho.mqtt.client as mqtt
import msgpack
import json
import re
from typing import Dict, Any
# MQTT 配置
BROKER = '192.168.1.88' # 替换为你的MQTT Broker地址
PORT = 1883
CLIENT_ID = 'llm_resp_subscriber'
TOPIC = 'ks/llm/resp/text'
def msgpack_to_json(msgpack_data: bytes) -> Dict[str, Any]:
"""将 msgpack 数据转换为 JSON 可序列化的字典"""
return msgpack.loads(msgpack_data, raw=False)
def extract_boolean_from_message(data: Dict[str, Any]) -> bool:
"""
从 data.message 中提取 true/false 使用正则匹配 "true"/"false" (不区分大小写)
""" message = data.get('data', {}).get('message', '').strip() # 去除前后空格
# 定义匹配模式
pattern_no = r'^A\s*\.\s*没有$' # 严格匹配 "A. 没有"
pattern_yes = r'^B\s*\.\s*有$' # 严格匹配 "B. 有"
if re.search(pattern_yes, message, re.IGNORECASE):
return True # 检测到安全帽
elif re.search(pattern_no, message, re.IGNORECASE):
return False # 未检测到安全帽
else:
return False # 默认未检测到
def on_connect(client, userdata, flags, reason_code, properties):
print(f'已连接到MQTT Broker (代码: {reason_code})')
client.subscribe(TOPIC, qos=2)
print(f'已订阅主题: {TOPIC}')
def on_message(client, userdata, msg):
try:
# 1. 解码msgpack
decoded = msgpack_to_json(msg.payload)
# 2. 转换为JSON字符串并打印
json_str = json.dumps(decoded, indent=2, ensure_ascii=False)
print(f'\n收到消息 (主题: {msg.topic}):\n{json_str}')
# 3. 提取true/false结果
result = extract_boolean_from_message(decoded)
print(f'正则提取结果: {result}')
except Exception as e:
print(f'处理消息时出错: {str(e)}')
def main():
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=CLIENT_ID)
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, PORT)
client.loop_forever()
if __name__ == '__main__':
print(f'启动MQTT订阅客户端,监听主题: {TOPIC}')
main()
回复