tencent cloud

消息队列 MQTT 版

动态与公告
新功能发布记录
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 MQTT 版
应用场景
技术架构
产品系列
MQTT 协议兼容说明
开源对比
高可用
产品约束与使用配额
基本概念
开服地域
购买指南
计费概述
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
公网接入
VPC 网络接入
用户指南
使用流程指引
配置账号权限
新建集群
管理 Topic
连接集群
查询消息
管理客户端
管理集群
查看监控和配置告警
数据集成
集成数据到云函数 SCF
集成数据到 CKafka
集成数据到 RocketMQ
开发指南
MQTT 5 高级特性
数据面 HTTP 接口说明
配置自定义域名
配置 SQL 过滤
配置点对点订阅
MQTT over QUIC
管理客户端订阅
消息增强规则
实践教程
MQTT 客户端开发注意事项
可观测能力
Topic 与通配符订阅
API 参考
History
Introduction
API Category
Making API Requests
Cluster APIs
Topic APIs
Authorization Policy APIs
User APIs
Client APIs
Message Enhancement Rule APIs
Message APIs
Data Types
Error Codes
SDK 参考
接入点格式
Java SDK
C SDK
Javascript/Node.JS/小程序
Go SDK
iOS SDK
JavaScript SDK
Dart SDK
Python SDK
.NET
安全与合规
权限管理
常见问题
相关协议
隐私协议
数据处理和安全协议
消息队列 MQTT 版服务等级协议
联系我们

配置点对点订阅

PDF
聚焦模式
字号
最后更新时间: 2026-01-30 15:23:30

场景说明

除了 MQTT 标准协议定义的发布/订阅(Pub/Sub)消息模型外,消息队列 MQTT 版还支持点对点(Point to Point,简称 P2P)模式。
说明:
当前功能灰度开启中,如您需要使用,请联系我们

什么是 P2P 消息模式?

当需要发送消息给指定的一个消费者时,可以使用点对点消息模式。对比主要提供单个 Publisher 多个 Subscriber 的 1:N,或者 M:N 场景方案的 Pub/Sub 消息模式, P2P 消息模式提供了高效的点对点通信方案。
使用 P2P 模式时,Publisher 已经明确该消息的目标接收者信息,并且该消息只需要被指定的单一客户端消费。发送者发送消息时通过设置符合命名规则的 Topic 指定接收者,接收者无需提前订阅即可消费到该消息。
使用 P2P 模式不仅可以节省接收者注册订阅关系的成本,还可以降低推送延迟。

发布 P2P 消息

String firstTopic = ...;
String targetClientId = ...;
String topic = firstTopic + "/p2p/" + targetClientId;
MqttMessage message = ...;
mqttClient.publish(topic, message);

订阅 P2P 消息

接收消息的客户端无需任何订阅处理,正确初始化和连接到集群的目标客户端即可收到 P2P 消息。

Paho Golang SDK 使用注意事项

Paho Golang SDK 在收到消息后, 会根据收到消息的 Topic 进行 matchAndDispatch
// matchAndDispatch takes a channel of Message pointers as input and starts a go routine that
// takes messages off the channel, matches them against the internal route list and calls the
// associated callback (or the defaultHandler, if one exists and no other route matched). If
// anything is sent down the stop channel the function will end.
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
var wg sync.WaitGroup
ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel

stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan
ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
goRoutinesDone := make(chan struct{}) // closed on wg.Done()
if order {
ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
} else {
// When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
ackInChan = make(chan *PacketAndToken)
go func() { // go routine to copy from ackInChan to ackOutChan until stopped
for {
select {
case a := <-ackInChan:
ackOutChan <- a
case <-stopAckCopy:
close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
for {
select {
case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
case <-goRoutinesDone:
close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
return
}
}
}
}
}()
}

go func() { // Main go routine handling inbound messages
for message := range messages {
// DEBUG.Println(ROU, "matchAndDispatch received message")
sent := false
r.RLock()
m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
var handlers []MessageHandler
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.TopicName) {
if order {
handlers = append(handlers, e.Value.(*route).callback)
} else {
hd := e.Value.(*route).callback
wg.Add(1)
go func() {
hd(client, m)
if !client.options.AutoAckDisabled {
m.Ack()
}
wg.Done()
}()
}
sent = true
}
}
if !sent {
if r.defaultHandler != nil {
if order {
handlers = append(handlers, r.defaultHandler)
} else {
wg.Add(1)
go func() {
r.defaultHandler(client, m)
if !client.options.AutoAckDisabled {
m.Ack()
}
wg.Done()
}()
}
} else {
DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.")
}
}
r.RUnlock()
for _, handler := range handlers {
handler(client, m)
if !client.options.AutoAckDisabled {
m.Ack()
}
}
// DEBUG.Println(ROU, "matchAndDispatch handled message")
}
if order {
close(ackOutChan)
} else { // Ensure that nothing further will be written to ackOutChan before closing it
close(stopAckCopy)
<-ackCopyStopped
close(ackOutChan)
go func() {
wg.Wait() // Note: If this remains running then the user has handlers that are not returning
close(goRoutinesDone)
}()
}
DEBUG.Println(ROU, "matchAndDispatch exiting")
}()
return ackOutChan
}
收到的消息可能不匹配任何一个 topic-filter,为了能正确处理这类消息,需要配置 defaultHandler:ClientOptions:SetDefaultPublishHandler(messagePubHandler)使 fallback 逻辑符合预期
package main

import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"time"
)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\\n", msg.Payload(), msg.Topic())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}

func sub(client mqtt.Client) {
topic := "home/room"
// Message handler per topic-filter
token := client.Subscribe(topic, 1, messagePubHandler)
result := token.Wait()
fmt.Printf("Subscribed to topic %s, %s", topic, result)
}

func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish("home/test", 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}

func main() {
// Acquire your instance access point from MQTT Console
var broker = "mqtt-xxx.mqtt.tencenttdmq.com"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("mqttGolangClient")
opts.SetUsername("YOUR-USERNAME")
opts.SetPassword("YOUR-PASSWORD")
// Need to configure defaultHandler to make P2P message fallback properly
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
sub(client)
publish(client)
time.Sleep(time.Minute * 3)
}


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈