tencent cloud

TDMQ for RocketMQ

Using the C++ SDK

PDF
Modo Foco
Tamanho da Fonte
Última atualização: 2026-01-23 17:52:24

Scenarios

This document uses the C++ SDK as an example to describe how to send and receive messages through an open-source software development kit (SDK), helping you better understand the complete process of sending and receiving messages.

Prerequisites

You have obtained the client connection parameters as instructed in SDK Overview.
You have installed a compiler suite that supports C++ 11.
You have installed bazel (5.2.0) or cmake (3.13 or later).
If cmake is used for compilation, grpc needs to be installed in advance. It is recommended that version 1.46.3 be installed because later versions are incompatible with the SDK.

Operation Steps

Step 1: Installing the C++ SDK

Note:
The TDMQ for RocketMQ 5.x series does not support Transport Layer Security (TLS), and a patch is required.

Step 2: Producing Messages

#include <algorithm>
#include <atomic>
#include <iostream>
#include <memory>
#include <random>
#include <string>
#include <system_error>

#include "rocketmq/CredentialsProvider.h"
#include "rocketmq/Logger.h"
#include "rocketmq/Message.h"
#include "rocketmq/Producer.h"

using namespace ROCKETMQ_NAMESPACE;

const std::string &alphaNumeric() {
static std::string alpha_numeric("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
return alpha_numeric;
}

std::string randomString(std::string::size_type len) {
std::string result;
result.reserve(len);
std::random_device rd;
std::mt19937 generator(rd());
std::string source(alphaNumeric());
std::string::size_type generated = 0;
while (generated < len) {
std::shuffle(source.begin(), source.end(), generator);
std::string::size_type delta = std::min({len - generated, source.length()});
result.append(source.substr(0, delta));
generated += delta;
}
return result;
}

static const std::string topic = "xxx";
// Enter the access address provided by Tencent Cloud.
static const std::string access_point = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8081";
// Add the configured AccessKey and SecretKey.
static const std::string access_key = "xxx";
static const std::string access_secret = "xxx";
static const uint32_t total = 32;
static const int32_t message_body_size = 128;


int main(int argc, char *argv[]) {
CredentialsProviderPtr credentials_provider;
if (!access_key.empty() && !access_secret.empty()) {
credentials_provider = std::make_shared<StaticCredentialsProvider>(access_key, access_secret);
}

// In most case, you don't need to create too many producers, singletion pattern is recommended.
auto producer = Producer::newBuilder()
.withConfiguration(Configuration::newBuilder()
.withEndpoints(access_point)
.withCredentialsProvider(credentials_provider)
.withSsl(false)
.build())
.withTopics({topic})
.build();

std::atomic_bool stopped;
std::atomic_long count(0);

auto stats_lambda = [&] {
while (!stopped.load(std::memory_order_relaxed)) {
long cnt = count.load(std::memory_order_relaxed);
while (count.compare_exchange_weak(cnt, 0)) {
break;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "QPS: " << cnt << std::endl;
}
};

std::thread stats_thread(stats_lambda);

std::string body = randomString(message_body_size);

try {
for (std::size_t i = 0; i < total; ++i) {
auto message = Message::newBuilder()
.withTopic(topic)
.withTag("TagA")
.withKeys({"Key-" + std::to_string(i)})
.withBody(body)
.build();
std::error_code ec;
SendReceipt send_receipt = producer.send(std::move(message), ec);
if (ec) {
std::cerr << "Failed to publish message to " << topic << ". Cause: " << ec.message() << std::endl;
} else {
std::cout << "Publish message to " << topic << " OK. Message-ID: " << send_receipt.message_id
<< std::endl;
count++;
}
}
} catch (...) {
std::cerr << "Ah...No!!!" << std::endl;
}
stopped.store(true, std::memory_order_relaxed);
if (stats_thread.joinable()) {
stats_thread.join();
}

return EXIT_SUCCESS;
}
Parameter
Description
access_key
Role token. You can copy the token from the AccessKey column on the Cluster Permissions page in the console.
access_secret
Role name. You can copy the role name from the SecretKey column on the Cluster Permissions page in the console.
access_point
Cluster access address. You can obtain the access address from the Access Information module on the Cluster Basic Information page in the console.
topic
Topic name. You can copy the name from the Topic Management page in the console.

Step 3: Consuming Messages

The TDMQ for RocketMQ 5.x series supports two types of clients: Push Consumer and Simple Consumer.
The following sample code uses Push Consumer as an example.
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>

#include "rocketmq/Logger.h"
#include "rocketmq/PushConsumer.h"

using namespace ROCKETMQ_NAMESPACE;

static const std::string topic = "xxx";
// Enter the access address provided by Tencent Cloud.
static const std::string access_point = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8081";
// Add the configured AccessKey and SecretKey.
static const std::string access_key = "xxx";
static const std::string access_secret = "xxx";
static const std::string group = "group-xxx";

int main(int argc, char *argv[]) {
auto &logger = getLogger();
logger.setConsoleLevel(Level::Info);
logger.setLevel(Level::Info);
logger.init();

std::string tag = "*";

auto listener = [](const Message &message) {
std::cout << "Received a message[topic=" << message.topic() << ", MsgId=" << message.id() << "]" << std::endl;
return ConsumeResult::SUCCESS;
};

CredentialsProviderPtr credentials_provider;
if (!access_key.empty() && !access_secret.empty()) {
credentials_provider = std::make_shared<StaticCredentialsProvider>(access_key, access_secret);
}

// In most case, you don't need to create too many consumers, singletion pattern is recommended.
auto push_consumer = PushConsumer::newBuilder()
.withGroup(group)
.withConfiguration(Configuration::newBuilder()
.withEndpoints(access_point)
.withRequestTimeout(std::chrono::seconds(3))
.withCredentialsProvider(credentials_provider)
.withSsl(false)
.build())
.withConsumeThreads(4)
.withListener(listener)
.subscribe(topic, tag)
.build();

std::this_thread::sleep_for(std::chrono::minutes(30));

return EXIT_SUCCESS;
}
Parameter
Description
access_key
Role token. You can copy the token from the AccessKey column on the Cluster Permissions page in the console.
access_secret
Role name. You can copy the role name from the SecretKey column on the Cluster Permissions page in the console.
access_point
Cluster access address. You can obtain the access address from the Access Information module on the Cluster Basic Information page in the console.
group
Consumer group name. You can copy the name from the Group Management page in the console.
topic
Topic name. You can copy the name from the Topic Management page in the console.

Step 4: Viewing Message Details

After a message is sent, you will receive a message ID (messageID). You can choose Message Query > General Query in the console to query the recently sent message, including the message details and trace.


Ajuda e Suporte

Esta página foi útil?

comentários