tencent cloud

TDMQ for RocketMQ

Sending and Receiving Normal Messages

PDF
Modo Foco
Tamanho da Fonte
Última atualização: 2026-01-23 17:36:45
TDMQ for RocketMQ is compatible with the community edition HTTP SDK. If your client previously used the community edition HTTP SDK, you do not need to modify any client code when switching to TDMQ for RocketMQ.

Scenarios

If you use Hypertext Transfer Protocol (HTTP) for message sending and receiving, after introducing the open-source HTTP SDK into your client, TDMQ for RocketMQ allows HTTP-based access over private or public networks.
This document walks you through the process of sending and receiving messages using the HTTP SDK, helping you better understand the complete workflow.
Note:
Transactional messages are not compatible with HTTP.
If you are using a 4.x cluster, you should specify the protocol type (Transmission Control Protocol (TCP) or HTTP) when creating a consumer group. For more information, see Creating a Group. Therefore, the same consumer group does not support simultaneous consumption by both TCP and HTTP clients.

Prerequisites

You have installed Hypertext Preprocessor (PHP).
For more examples, see Demo in the open-source community.

Retry Mechanisms

HTTP uses a mechanism with a fixed retry interval:
Retry Interval
Maximum Number of Retries
5 minutes
You can modify the consumer group configuration to customize the maximum number of retries. The default is 16 times.
Note:
If the client acknowledges the message within the retry interval, it indicates that the message has been successfully consumed, and no further retries will occur.
If the client has not acknowledged the message by the end of the retry interval, the client will consume the message again.
The message handle for each consumption is only valid within the retry interval and expires afterward.

Operation Steps

Step 1: Installing the PHP Dependency Library

Introduce the relevant dependencies in your PHP project:
{
"require": {
"aliyunmq/mq-http-sdk": ">=1.0.4"
}
}

Step 2: Producing Messages

Creating a Message Producer

private $client;
private $producer;
public function __construct()
{
// Obtain the client.
$this->client = new MQClient(
endpoint,
accessKey,
secretKey
);

$topic = topicName;
$instanceId = namespace;

// Obtain the producer.
$this->producer = $this->client->getProducer($instanceId, $topic);
}
Note:
You can log in to the TDMQ for RocketMQ console to obtain the following parameters.
Parameter
Description
endpoint
Cluster access address. You can obtain the access address from the Access Information module on the Cluster Basic Information page in the console.
accessKey
Role token. You can copy it from the AccessKey column on the Cluster Permissions page in the console.
secretKey
Role name. You can copy the role name from the SecretKey column on the Cluster Permissions page in the console.
namespace
Namespace name of the exclusive cluster. You can copy the namespace name from the Namespace page in the console. If you are using a 4.x general cluster (rocketmq-xxx) or a 5.x cluster (rmq-xxx), specify the cluster ID for this parameter.
topicName
Topic name. You can copy the name from the Topic Management page in the console.

Sending Messages

public function run()
{
try
{
for ($i=0; $i<8; $i++)
{
$publishMessage = new TopicMessage(
"hello mq!"
);
// Set properties.
$publishMessage->putProperty("a", $i);
$result = $this->producer->publishMessage($publishMessage);
print "Send success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\\n";
}
} catch (\\Exception $e) {
print_r($e->getMessage() . "\\n");
}
}

Step 3: Consuming Messages

Creating a Consumer

private $client;
private $consumer;

public function __construct()
{
// Obtain the client.
$this->client = new MQClient(
endpoint,
accessKey,
secretKey
);

$topic = topicName;
$groupId = groupName;
$instanceId = namespace;

// Obtain the consumer.
$this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
}
Note:
You can log in to the TDMQ for RocketMQ console to obtain the following parameters.
Parameter
Description
endpoint
Cluster access address. You can obtain the access address from the Access Information module on the Cluster Basic Information page in the console.
accessKey
Role token. You can copy it from the AccessKey column on the Cluster Permissions page in the console.
secretKey
Role name. You can copy the role name from the SecretKey column on the Cluster Permissions page in the console.
namespace
Namespace name of the exclusive cluster. You can copy the namespace name from the Namespace page in the console. If you are using a 4.x general cluster (rocketmq-xxx) or a 5.x cluster (rmq-xxx), specify the cluster ID for this parameter.
topicName
Topic name. You can copy the name from the Topic Management page in the console.
groupName
Consumer group name. You can copy the name from the Group Management page in the console.

Subscribing to Messages

public function run()
{
while (True) {
try {
// Consume messages through long round-robin scheduling.
// Long round-robin scheduling means that if there are no messages in the topic, the request waits on the server side. If messages are available for consumption, they are returned immediately.
// If your business is sensitive to consumption delays, it is highly recommended to pull messages concurrently.
$messages = $this->consumer->consumeMessage(
batchSize,
waitSeconds
);
} catch (\\Exception $e) {
if ($e instanceof MQ\\Exception\\MessageNotExistException) {
printf("No message, contine long polling!RequestId:%s\\n", $e->getRequestId());
continue;
}

print_r($e->getMessage() . "\\n");

sleep(1);
continue;
}

print "consume finish, messages:\\n";

$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
printf("MessageID:%s TAG:%s BODY:%s \\nPublishTime:%d, FirstConsumeTime:%d, \\nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\\n",
$message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
$message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
$message->getMessageKey());
print_r($message->getProperties());
// Process business logic.
}

print_r($receiptHandles);
try {
$this->consumer->ackMessage($receiptHandles);
} catch (\\Exception $e) {
if ($e instanceof MQ\\Exception\\AckMessageException) {
printf("Ack Error, RequestId:%s\\n", $e->getRequestId());
foreach ($e->getAckMessageErrorItems() as $errorItem) {
printf("\\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
}
}
}
print "ack finish\\n";
}
}
Parameter
Description
batchSize
Number of messages pulled in a single batch. It supports up to 16 messages.
waitSeconds
Wait time in round-robin scheduling for a single pull request. It supports up to 30 seconds.


Ajuda e Suporte

Esta página foi útil?

comentários