tencent cloud

TDMQ for RocketMQ

Using the Go SDK

PDF
포커스 모드
폰트 크기
마지막 업데이트 시간: 2026-01-23 17:52:24

Scenarios

This document uses the Go SDK as an example to describe how to send and receive messages through an open-source software development kit (SDK), helping you better understand the complete process of sending and receiving messages.

Prerequisites

You have obtained the client connection parameters as instructed in SDK Overview.

Operation Steps

Step 1: Installing the Go Dependency Library

Add relevant dependencies to the Golang project. Taking go get as an example, run the following command:
go get github.com/apache/rocketmq-clients/golang/v5

Step 2: Producing Messages

package main

import (
"context"
"fmt"
"log"
"os"
"strconv"
"time"

rmq_client "github.com/apache/rocketmq-clients/golang/v5"
"github.com/apache/rocketmq-clients/golang/v5/credentials"
)

const (
Topic = "xxxxxx"
// Endpoint: Enter the access address provided by Tencent Cloud.
Endpoint = "xxxxxx"
// AccessKey: Add the configured AccessKey.
AccessKey = "xxxxxx"
// SecretKey: Add the configured SecretKey.
SecretKey = "xxxxxx"
)

func main() {
os.Setenv("mq.consoleAppender.enabled", "true")
rmq_client.ResetLogger()
// In most case, you don't need to create many producers, singleton pattern is more recommended.
producer, err := rmq_client.NewProducer(&rmq_client.Config{
Endpoint: Endpoint,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
AccessSecret: SecretKey,
},
},
rmq_client.WithTopics(Topic),
)
if err != nil {
log.Fatal(err)
}
// start producer
err = producer.Start()
if err != nil {
log.Fatal(err)
}
// graceful stop producer
defer producer.GracefulStop()

for i := 0; i < 10; i++ {
// new a message
msg := &rmq_client.Message{
Topic: Topic,
Body: []byte("this is a message : " + strconv.Itoa(i)),
}
// set keys and tag
msg.SetKeys("a", "b")
msg.SetTag("ab")
// send message in sync
resp, err := producer.Send(context.TODO(), msg)
if err != nil {
log.Fatal(err)
}
for i := 0; i < len(resp); i++ {
fmt.Printf("%#v\\n", resp[i])
}
// wait a moment
time.Sleep(time.Second * 1)
}
}
Parameter
Description
AccessKey
Role token. You can copy the token from the AccessKey column on the Cluster Permissions page in the console.
SecretKey
Role name. You can copy the role name from the SecretKey column on the Cluster Permissions page in the console.
Endpoints
Cluster access address. You can obtain the access address from the Access Information module on the Cluster Basic Information page in the console.
Topic
Topic name. You can copy the name from the Topic Management page in the console.

Step 3: Consuming Messages

The TDMQ for RocketMQ 5.x series supports two consumption modes: Push Consumer and Simple Consumer.
Note:
The Golang SDK of the Community Edition only supports Simple Consumer.
The following sample code uses Simple Consumer as an example:
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal" "sync" "syscall"
"time"

rmq_client "github.com/apache/rocketmq-clients/golang/v5"
"github.com/apache/rocketmq-clients/golang/v5/credentials"
)

const (
Topic = "xxxxxx"
ConsumerGroup = "xxxxxx"
// Endpoint: Enter the access address provided by Tencent Cloud.
Endpoint = "xxxxxx"
// AccessKey: Add the configured AccessKey.
AccessKey = "xxxxxx"
// SecretKey: Add the configured SecretKey.
SecretKey = "xxxxxx"
)

var (
// maximum waiting time for receive func
awaitDuration = time.Second * 5
// maximum number of messages received at one time
maxMessageNum int32 = 16
// invisibleDuration should > 20s
invisibleDuration = time.Second * 20
// receive concurrency receiveConcurrency = 6
)

func main() {
// log to console
os.Setenv("mq.consoleAppender.enabled", "true")
rmq_client.ResetLogger()
// In most case, you don't need to create many consumers, singleton pattern is more recommended.
simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{
Endpoint: Endpoint,
ConsumerGroup: ConsumerGroup,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
AccessSecret: SecretKey,
},
},
rmq_client.WithAwaitDuration(awaitDuration),
rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{
Topic: rmq_client.SUB_ALL,
}),
)
if err != nil {
log.Fatal(err)
}
// start simpleConsumer
err = simpleConsumer.Start()
if err != nil {
log.Fatal(err)
}
// graceful stop simpleConsumer
defer func() {
if r := recover(); r != nil {
fmt.Println(r)
}
_ = simpleConsumer.GracefulStop()
}()

fmt.Println("start receive message")

// Each Receive call will only select one broker queue to pop messages.
// Enable multiple consumption goroutines to reduce message end-to-end latency.
ch := make(chan struct{})
wg := &sync.WaitGroup{}
for i := 0; i < receiveConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ch:
return
default:
mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
if err != nil {
fmt.Println("receive message error: " + err.Error())
}
// ack message
for _, mv := range mvs {
fmt.Println(mv)
if err := simpleConsumer.Ack(context.TODO(), mv); err != nil {
fmt.Println("ack message error: " + err.Error())
}
}
}
}
}()
}

exit := make(chan os.Signal, 1)
signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM)

// wait for exit
<-exit
close(ch)
wg.Wait()
}
Parameter
Description
accessKey
Role token. You can copy the token from the AccessKey column on the Cluster Permissions page in the console.
secretKey
Role name. You can copy the role name from the SecretKey column on the Cluster Permissions page in the console.
endpoints
Cluster access address. You can obtain the access address from the Access Information module on the Cluster Basic Information page in the console.
consumerGroup
Consumer group name. You can copy the name from the Group Management page in the console.
topic
Topic name. You can copy the name from the Topic Management page in the console.

Step 4: Viewing Message Details

After a message is sent, you will receive a message ID (messageID). You can choose Message Query > Comprehensive Query in the console to query the recently sent message, including the message details and trace.


도움말 및 지원

문제 해결에 도움이 되었나요?

피드백