对于消息队列,生产者通常是入门第一个接触的对象,用于生产消息给消费者消费。本文通过介绍生产者实现类的属性、方法,引出生产者的启动过程、高可靠的实现方式等,主要介绍内容如下:

  • RocketMQ 支持3种消息:普通消息(并发消息)、顺序消息、事务消息
  • RocketMQ 支持3种发送方式:同步发送、异步发送、单向发送
  • RocketMQ 生产者最佳实践和总结

生产者原理

生产者概述

      发送消息的一方被称为生产者,它在整个 RocketMQ 的生产和消费体系中扮演的角色如下图所示。
请输入图片描述

  • 生产者组:一个逻辑概念,在使用生产者实例的时候需要指定一个组名。一个生产者组可以生成多个Topic的消息。
  • 生产者实例:一个生产者组部署了多个进程,每个进程都可以成为一个生产者实例。
  • Topic:主题名字,一个 Topic 由若干个 Queue组成。

      RocketMQ 客户端中的生产者有两个实现类:org.apache.rocketmq.client.producer.DefaultMQProducerorg.apache.rocketmq.client.producer.TransactionMQProducer。前者用户生产普通消息、顺序消息、单向消息、批量消息、延迟消息,后者主要用户生产事务消息。

请输入图片描述

小试牛刀

      如下演示了第一个生产者实例:

public class Producer {
    public static void main(String[] args) throws Exception {
        final DefaultMQProducer producer = new DefaultMQProducer("pay_group");
        producer.setNamesrvAddr("xxxx:9876");
        producer.setRetryTimesWhenSendAsyncFailed(2);
        producer.start();
        System.out.println("Producer Starting...");
        // Thread.sleep(5000);
        final Message message = new Message("xxx", "xxx",  "First Demo".getBytes(RemotingHelper.DEFAULT_CHARSET));
        final SendResult sendResult = producer.send(message);
        System.out.println(sendResult);

        producer.shutdown();
    }
}

请输入图片描述

消息结构和消息类型

      消息类是发送的主体,如下展示了一些核心字段和方法(省略细节代码):

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;

    private String topic;
    private int flag;
    private Map<String, String> properties;
    private byte[] body;
    private String transactionId;

    public void setKeys(String keys) {}
    public void setKeys(Collection<String> keys) {}
    public void setTags(String tags) {}
    public void setDelayTimeLevel(int level) {}
    public void setTopic(String topic) {}
    public void putUserProperty(final String name,final String value) {}
}
  • topic:主题名字,可以通过 RocketMQ Console 创建。
  • Flag:目前没有用。
  • Properties:消息扩展信息,Tag、keys、延迟级别都保存在这里。
  • Body:消息体,字节数组。需要注意生产者和消费者应该使用相同的编码,否则会产生乱码。
  • transactionId:事务ID。
  • setKeys():设置消息的 key,多个key可以用MesssageConst.KEY_SPEARATOR分隔或者直接用另一个重载方法。如果 Broker 中 messageIndexEnable=true 则会根据 key 创建消息的 Hash 索引来帮助用户进行快速查询。
  • setTags():消息过滤的标记,用户可以订阅某个 Topic 的某些 Tag,这样 Broker 只会把订阅了 topic-tag 的消息发送给消费者。
  • setDelayTimeLevel():设置延迟级别,延迟多久消费者可以消费。
  • putUserProperty():如果还有其他扩展信息,可以设置在 properties 中。重复调用后覆盖旧值。

消息的类别

      Rocket MQ 支持普通消息、分区有序消息、全局有序消息、延迟消息和事务消息。

  • 普通消息:普通消息也称为并发消息,和传统的队列相比,并发消息没有顺序,但是生产消费都是并行进行的,单机性能可达十万级 TPS。
  • 分区有序消息:与 Kafka 中的分区类似,把一个 Topic 消息分为多个分区“保存”和消费,在一个分区内的消息就是传统的队列,遵循 FIFO 原则。
  • 全局有序消息:如果吧一个 Topic 的分区数设置为1,那么该 Topic 中的消息就是单分区,所有消息都遵循 FIFO 的原则。
  • 延迟消息:消息发送后,消费者要在一定的时间后,或者指定的某个时间点才可以消费。在没有延迟消息时,基本的做法是基于定时任务调度,定时发送消息。
  • 事务消息:主要涉及分布式事务,即需要保证在多个操作同时成功或者同时失败时消费者才能消费消息。RocketMQ 通过发送 Half 消息、处理本地事务、提交消息或者回滚消息优雅地实现分布式事务。

生产者高可用

      在发送消息的过程中,客户端、Broker、Namesrv 都有可能发生异常,在发生异常时我们依旧需要保证消息的可靠发送。

客户端保证

      第一种保证机制:重试机制。RocketMQ 支持同步、异步发送消息,不管使用哪种方式都可以配置失败后重试,如果单个 Broker 发生故障,重试会选择其他的 Broker 保证消息的正常发送。

配置项retryTimesWhenSendFailed表示同步重试次数,默认为2次,加上正常发送1次,总共3次机会。

      第二种保证机制:客户端容错。RocketMQ Client 会维护一个"Broker-发送延迟"关系,根据这个关系选择一个发送延迟级别较低的 Broker 来发送消息,这样能最大限度地利用 Broker 的能力,剔除已经宕机、不可用或者发送延迟级别较高的 Broker,尽量保证消息的正常发送。

Broker端保证

      单节点的 Broker 无法保证数据的可靠性,生产环境中建议部署2个Master和2个Slave。主从直接的同步方式分为同步复制异步复制

  • 同步复制:消息发送到 Master Broker 后,同步到 Slave Broker 才算发送成功;
  • 异步复制:消息发送到 Master Broker 即表示发送成功。

生产者启动流程

请输入图片描述

  • 第一步:通过 switch-case 判断当前生产者的服务状态,创建时默认状态为 CREATE_JUST 。设置默认启动状态为启动失败。
  • 第二步:执行 checkConfig() 方法。校验生产者实例设置的各种参数。比如生产者组名是否为空、是否满足命名规则、长度是否满足等。
  • 第三步:执行 changeInstanceNameToPID() 方法。校验 instance name,如果是默认名字则将其修改为进程id。
  • 第四步:执行 getAndCreateMQClientInstance() 方法。根据生产者组名获取或者初始化一个 MQClientInstance。初始化代码如下所示:
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    String clientId = clientConfig.buildMQClientId();
    MQClientInstance instance = this.factoryTable.get(clientId);
    if (null == instance) {
        // ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),
                this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }

    return instance;
}

      MQClientInstance 实例与 clientId 是一一对应的,而 clientId 是由 clientIP、instanceName 及 unitName 构成的。一般来讲,为了减少客户端的使用资源,如果将所有的 instanceName 和 unitName 设置为相同的值,就只会创建一个 MQClientInstance 实例。生成 clientId 的代码如下所示:

public String buildMQClientId() {
    StringBuilder sb = new StringBuilder();
    sb.append(this.getClientIP());

    sb.append("@");
    sb.append(this.getInstanceName());
    if (!UtilAll.isBlank(this.unitName)) {
        sb.append("@");
        sb.append(this.unitName);
    }

    return sb.toString();
}

      MQClientInstance 实例的功能是管理本实例中全部生产者和消费者的生产和消费行为。核心属性如下所示:

public class MQClientInstance {
    ...
    private final String clientId;
    private final long bootTimestamp = System.currentTimeMillis();
    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
    private final NettyClientConfig nettyClientConfig;
    private final MQClientAPIImpl mQClientAPIImpl;
    private final MQAdminImpl mQAdminImpl;
    private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
    private final Lock lockNamesrv = new ReentrantLock();
    private final Lock lockHeartbeat = new ReentrantLock();
    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
        new ConcurrentHashMap<String, HashMap<Long, String>>();
    private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
        new ConcurrentHashMap<String, HashMap<String, Integer>>();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MQClientFactoryScheduledThread");
        }
    });
    private final ClientRemotingProcessor clientRemotingProcessor;
    private final PullMessageService pullMessageService;
    private final RebalanceService rebalanceService;
    private final DefaultMQProducer defaultMQProducer;
    private final ConsumerStatsManager consumerStatsManager;
    ...
}
  • producerTable:当前 client 实例的全部生产者的内部实例
  • consumerTable:当前 client 实例的全部消费者的内部实例
  • adminExtTable:当前 client 实例的全部管理实例
  • mQClientAPIImpl:每个 client 也是一个Netty Server,也会支持 Broker 访问,这里实现了全部 client 支持的接口
  • topicRouterTable:当前生产者、消费者中全部 Topic 的本地缓存路由信息
  • scheduledExecutorService:本地定时任务,比如定期获取当前 Namesrv 地址、定期同步 Namesrv 信息、定期更新 Topic 路由信息、定期发送心跳信息给 Broker 、定期清理已下线的 Broker 、定期持久化消费位点、定期调整消费线程数
  • clientRemotingProcessor:请求的处理器,从处理方法 processRequest() 中我们可以知道目前支持哪些功能接口
  • pullMessageService:Pull 服务
  • rebalanceService:重新平衡服务。定期执行重新平衡方法 this.mqClientFactory.doRebalance()。这里的 mqClientFactory 就是 MQClientInstance 实例,通过依次调用 MQClientInstance 中保存的消费者实例的 doRebalance() 方法,来感知订阅关系的变化、集群变化等,以达到重新平衡
  • consumerStatsManager:消费监控。比如拉取RT(Response Time,响应时间)、拉取TPS(Transactions Per Second,每秒处理消息数)、消费RT等

      核心方法如下所示:

public class MQClientInstance {
  ...
  public void updateTopicRouteInfoFromNameServer(){}
  private void cleanOfflineBroker() {}
  public void checkClientInBroker() throws MQClientException {}
  public void sendHeartbeatToAllBrokerWithLock() {}
  public boolean updateTopicRouteInfoFromNameServer(final String topic) {}
  public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {}
  public boolean registerConsumer(final String group,MQConsumerInner consumer) {}
  public void unregisterConsumer(final String group) {}
  public boolean registerProducer(final String group, DefaultMQProducerImpl producer) {}
  public void unregisterProducer(final String group) {}
  public boolean registerAdminExt(final String group, MQAdminExtInner admin) {}
  public void unregisterAdminExt(final String group) {}
  public void rebalanceImmediately() {}
  public void doRebalance() {}
  public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {}
  public String findBrokerAddressInPublish(final String brokerName) {}
  public FindBrokerResult findBrokerAddressInSubscribe(final String brokerName, final long brokerId, final boolean onlyThisBroker) {}
  public List<String> findConsumerIdList(final String topic, final String group) {}
  public String findBrokerAddrByTopic(final String topic) {}
  public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {}
  public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) {}
  public concurrent.ConcurrentMap<String, TopicRouteData> getTopicRouteTable() {}
  public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String consumerGroup, final String brokerName) {}
  public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup) {}
  ...
}
  • updateTopicRouteInfoFromNameServer:从多个 Namesrv 中获取最新 Topic 路由信息,更新本地缓存
  • cleanOfflineBroker:清理已经下线的 Broker
  • checkClientInBroker:检查 Client 是否在 Broker 中有效
  • sendHeartbeatToAllBrokerWithLock:发送客户端的心跳信息给所有的Broker
  • registerConsumer:在本地注册一个消费者
  • unregisterConsumer:取消本地注册的消费者
  • registerProducer:在本地注册一个生产者
  • unregisterProducer:取消本地注册的生产者
  • registerAdminExt:注册一个管理实例
  • rebalanceImmediately:立即执行一次 Rebalance。该操作是通过 RocketMQ 的一个 CountDownLatch2 锁来实现的
  • doRebalance:对于所有已经注册的消费者实例,执行一次 Rebalance
  • findBrokerAddressInAdmin:在本地缓存中查找 Master 或者 Slave Broker 的信息
  • findBrokerAddressInSubscribe:在本地缓存种查找 Slave Broker 信息
  • findBrokerAddressInPublish:在本地缓存种查找 Master Broker 地址
  • findConsumerIdList:查找消费者 id 列表
  • findBrokerAddrByTopic:通过 Topic 名字查找 Broker 地址
  • resetOffset:重置消费位点
  • getConsumerStatus:获取一个订阅关系中每个队列的消费进度
  • getTopicRouteTable:获取本地缓存 Topic 路由
  • consumeMessageDirectly:直接将消息发送给指定的消费者消费,和正常投递不同的是,指定了已经订阅的消费者组中的一个,而不是全部已经订阅的消费者。一般适用于在消费消息后,某一个消费者组想再消费一次的场景
  • consumerRunningInfo:获取消费者的消费统计信息。包含消费RT、消费TPS等

消息发送流程

      RocketMQ 客户端的消息发送通常分为以下3层:
      业务层:通常指直接调用 RocketMQ Client 发送 API 的业务代码
      消息处理层:指 RocketMQ Client 获取业务发送的消息对象后,一系列的参数检查、消息发送准备、参数包装等操作
      通信层:指 RocketMQ 基于 Netty 封装的一个 RPC 通信服务,RocketMQ 的各个组件之间的通信全部使用该通信层。
请输入图片描述

  • 第一步:调用 defaultMQProducerImpl.send() 方法发送消息
  • 第二步:通过设置的发送超时时间,调用 defaultMQProducerImpl.send() 方法发送消息。设置的超时时间可以通过 sendMsgTimeout 进行变更,其默认值为3s
  • 第三步:执行 defaultMQProducerImpl.sendDefaultImpl() 方法: private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout)communicationMode:通信模式,同步、异步还是单向。sendCallback:对于异步模式,需要设置发送完成后的回调。

      sendDefaultImpl方法是发送消息的核心方法,执行过程分为5步:

  • 第一步:两个检查:生产者状态、消息及消息内容。没有运行的生产者不能发送消息。消息检查主要检查消息是否为空,消息的 Topic 的名字是否为空或者是否符合规范:消息体大小是否符合要求,最大值为4MB,可以通过 maxMessageSize 进行设置。
  • 第二步:执行tryToFindTopicPublishInfo方法,获取 Topic 路由信息,如果不存在则发出异常提醒用户。如果本地缓存没有路由信息,就通过 Namesrv 获取路由信息,更新到本地,再返回。具体代码如下所示:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    // topicPublishInfo.ok(): null != this.messageQueueList && !this.messageQueueList.isEmpty()
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
  • 第三步:计算消息发送的重试次数timesTotal,同步重试和异步重试的执行方式是不同的。
  • 第四步:执行队列选择方法 selectOneMessageQueue()。根据队列对象中保存的上次发送消息的 Broker 的名字和 Topic 路由,选择(轮询)一个 Queue 将消息发送到 Broker。我们可以通过 sendLatencyFaultEnable来设置是否总是发送到延迟级别较低的 Broker,默认值为 Flase。
  • 第五步:执行sendKernelImpl方法。该方法是发送消息的核心方法,主要用于准备通信层的入参(比如Broker地址、请求体等),将请求传递给通信层,内部实现是基于Netty的,在封装为通信层 request 对象 RemotingCommand 前,会设置 RequestCode 表示当前请求时发送单个消息还是批量消息。

最佳实践

发送顺序消息

      同步发送消息是,根据 HashKey 将消息发送到指定的分区中,每个分区中的消息都是按照发送顺序保存的,即分区有序。如果 Topic 的分区被设置为1,这个 Topic 的消息就是全局有序的。注意:顺序消息的发送必须是单线程,多线程将不再有序。顺序消息的消费和普通消息的消费方式不同。

public class OrderMessageProducer {

    public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException, RemotingException, MQBrokerException {
        final DefaultMQProducer producer = new DefaultMQProducer("pay_group");
        producer.setNamesrvAddr("192.168.233.145:9876");
        producer.setRetryTimesWhenSendAsyncFailed(2);
        producer.start();
        System.out.println("Producer Starting...");
        // Thread.sleep(5000);
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 100; i++) {
            int orderId = i % 10;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("catwinghu", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object orderId) {
                    Integer id = (Integer) orderId;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);

            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();

    }
}

      如下图展示了发送结果,不难发现orderId%5为0 的order都发送到了0号队列,实现了顺序性。
请输入图片描述

发送延迟消息

      生产者发送消息后,消费者在指定时间才能消费消息,这类消息被称为延迟消息或定时消息。生产者发送延迟消息前需要设置延迟级别,目前开源版本支持18个延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

public class DelayMessageProducer {
    public static void main(String[] args) throws Exception{
        final DefaultMQProducer producer = new DefaultMQProducer("pay_group");
        producer.setNamesrvAddr("192.168.233.145:9876");
        producer.setRetryTimesWhenSendAsyncFailed(2);
        producer.start();
        Thread.sleep(5000);
        System.out.println("Producer Starting...");
        final Message message = new Message("catwinghu", "taga",  "DelayMessage".getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        // 设置延迟级别
        message.setDelayTimeLevel(3);
        final SendResult sendResult = producer.send(message);
        System.out.println(sendResult);

        producer.shutdown();
    }
}

      Broker 在接收到用户发送的消息后,首先将消息保存到名为SCHEDULE_TOPIC_XXX的 Topic 中。此时消费者无法消费到该延迟消息。然后,由 Broker 端的定时投递任务定时投递给消费者。保存延迟消息的实现逻辑见org.apache.rocketmq.store.schedule.ScheduleMessageService类。按照配置的延迟级别初始化多个任务,每秒执行一次,若消息投递满足时间条件,则将消息投递到原始的 Topic 中。发送(投递)延迟消息定时任务(DeliverDelayedMessageTimerTask)代码如下所示:

class DeliverDelayedMessageTimerTask extends TimerTask {
    /**
     * 延迟级别
     */
    private final int delayLevel;
    /**
     * 位置
     */
    private final long offset;

    public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
        this.delayLevel = delayLevel;
        this.offset = offset;
    }

    @Override
    public void run() {
        try {
            this.executeOnTimeup();
        } catch (Exception e) {
            // XXX: warn and notify me
            log.error("ScheduleMessageService, executeOnTimeup exception", e);
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
        }
    }

    /**
     * 纠正可投递时间。
     * 因为发送级别对应的发送间隔可以调整,如果超过当前间隔,则修正成当前配置,避免后面的消息无法发送。
     *
     * @param now 当前时间
     * @param deliverTimestamp 投递时间
     * @return 纠正结果
     */
    private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
        long result = deliverTimestamp;

        long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
        if (deliverTimestamp > maxTimestamp) {
            result = now;
        }

        return result;
    }

    public void executeOnTimeup() {
        ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,  delayLevel2QueueId(delayLevel));

        long failScheduleOffset = offset;

        if (cq != null) {
            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
            if (bufferCQ != null) {
                try {
                    long nextOffset = offset;
                    int i = 0;
                    for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                        long offsetPy = bufferCQ.getByteBuffer().getLong();
                        int sizePy = bufferCQ.getByteBuffer().getInt();
                        long tagsCode = bufferCQ.getByteBuffer().getLong();

                        long now = System.currentTimeMillis();
                        long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                        long countdown = deliverTimestamp - now;

                        if (countdown <= 0) { // 消息到达可发送时间
                            MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
                            if (msgExt != null) {
                                try {
                                    // 发送消息
                                    MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                    PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);
                                    if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 发送成功
                                        continue;
                                    } else { // 发送失败
                                        // XXX: warn and notify me
                                        log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId());

                                        // 安排下一次任务
                                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);

                                        // 更新进度
                                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                        return;
                                    }
                                } catch (Exception e) {
                                    // XXX: warn and notify me
                                    log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                            + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e);
                                }
                            }
                        } else {
                            // 安排下一次任务
                            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);

                            // 更新进度
                            ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                            return;
                        }
                    } // end of for

                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                    // 安排下一次任务
                    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);

                    // 更新进度
                    ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                    return;
                } finally {
                    bufferCQ.release();
                }
            } // end of if (bufferCQ != null)
            else { // 消费队列已经被删除部分,跳转到最小的消费进度
                long cqMinOffset = cq.getMinOffsetInQueue();
                if (offset < cqMinOffset) {
                    failScheduleOffset = cqMinOffset;
                    log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                        + cqMinOffset + ", queueId=" + cq.getQueueId());
                }
            }
        } // end of if (cq != null)

        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);
    }

    /**
     * 设置消息内容
     *
     * @param msgExt 消息
     * @return 消息
     */
    private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());

        TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
        long tagsCodeValue =
            MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
        msgInner.setTagsCode(tagsCodeValue);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

        msgInner.setSysFlag(msgExt.getSysFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(msgExt.getStoreHost());
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());

        msgInner.setWaitStoreMsgOK(false);
        MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);

        msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));

        String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
        int queueId = Integer.parseInt(queueIdStr);
        msgInner.setQueueId(queueId);

        return msgInner;
    }
}

发送单向消息

      单向消息的生产者只管发送过程,不管发送结果。单向消息主要用于日志传输等消息允许丢失的场景。

public class OnewayProducer {

    public static void main(String[] args) throws Exception {
        final DefaultMQProducer producer = new DefaultMQProducer("pay_group");
        producer.setNamesrvAddr("192.168.233.145:9876");
        producer.setRetryTimesWhenSendAsyncFailed(2);
        producer.start();
        System.out.println("Producer Starting...");
        Thread.sleep(5000);
        final Message message = new Message("catwinghu", "taga",  "OneWay Demo".getBytes(RemotingHelper.DEFAULT_CHARSET));
        producer.sendOneway(message);

        producer.shutdown();
    }
}

批量发送消息

      批量消息发送能提高发送效率,提升系统吞吐量。批量消息发送有以下3点注意事项:

  1. 消息最好小于1MB
  2. 同一批批量消息的Topic、waitStoreMsgOK属性必须一致
  3. 批量消息不支持延迟消息
public class BatchMsgProducer {
    public static void main(String[] args) throws Exception {
        final DefaultMQProducer producer = new DefaultMQProducer("batch_group");
        producer.setNamesrvAddr("192.168.233.145:9876");
        producer.setRetryTimesWhenSendAsyncFailed(2);
        producer.start();
        System.out.println("Producer Starting...");
        Thread.sleep(5000);
        final List<Message> messages = Arrays.asList(new Message("catwinghu", "taga", "Order001".getBytes(RemotingHelper.DEFAULT_CHARSET)),
                new Message("catwinghu", "taga", "Order002".getBytes(RemotingHelper.DEFAULT_CHARSET)),
                new Message("catwinghu", "taga", "Order003".getBytes(RemotingHelper.DEFAULT_CHARSET)));

        final SendResult sendResult = producer.send(messages);
        System.out.println(sendResult);

        producer.shutdown();
    }
}

请输入图片描述

发送事务消息

      事务消息的发送、消费流程和延迟消息类似,都是先发送到一个对消费者不可见的 Topic 中。当事务被业务提交后,会被二次投递到原始的 Topic 中,此时消费者正常消费,事务消息的发送具体分为以下两个步骤:

  • 第一步:用户发送一个 Half 消息到 Broker,Broker 设置 queueOffset=0,即对消费者不可见
  • 第二步:用户本地事务处理成功,发送一个 Commit 消息到 Broker,Broker 修改 queueOffset 为正常值,达到重新投递的目的,此时消费者可以正常消费;如果本地事务处理失败,那么僵发送一个 Rollback 消息给 Broker,Broker 将删除 Half 消息。为防止生产者提交 Commit 消息出现异常,Broker 会定期回查生产者,确认生产者本地事务的执行状态,再决定是提交还是回滚。

请输入图片描述

      整体交互流程图如下所示:
请输入图片描述

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;

public class TransactionProducer {
    private String producerGroup = "transaction_group";

    //事务监听器
    private TransactionListener listener = new TransactionCheckListenerImpl();

    private TransactionMQProducer producer = null;

    private ExecutorService executorService = new ThreadPoolExecutor(2,5,100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2000),new ThreadFactory(){
        @Override
        public Thread newThread(Runnable r){
            final Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }
    });

    public TransactionProducer(){
        producer = new TransactionMQProducer(producerGroup);
        producer.setNamesrvAddr("192.168.233.145:9876");
        producer.setTransactionListener(listener);
        producer.setExecutorService(executorService);
        start();
    }

    public void start(){
        try {
            this.producer.start();
            Thread.sleep(5000);
            System.out.println("Producer Starting...");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void shutdown(){
        this.producer.shutdown();
    }

    public TransactionMQProducer getProducer(){
        return producer;
    }

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
        final TransactionProducer transactionProducer = new TransactionProducer();
        final TransactionMQProducer producer = transactionProducer.getProducer();
        final Message trmsg1 = new Message("catwinghu", "taga","1111", "TransactionMsg1".getBytes(RemotingHelper.DEFAULT_CHARSET));
        final Message trmsg2 = new Message("catwinghu", "taga","2222", "TransactionMsg2".getBytes(RemotingHelper.DEFAULT_CHARSET));
        final Message trmsg3 = new Message("catwinghu", "taga","3333", "TransactionMsg3".getBytes(RemotingHelper.DEFAULT_CHARSET));
        producer.sendMessageInTransaction(trmsg1,1);
        producer.sendMessageInTransaction(trmsg2,2);
        producer.sendMessageInTransaction(trmsg3,3);
    }

}


class TransactionCheckListenerImpl implements TransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("执行本地事务");
        final String body = new String(msg.getBody());
        final String keys = msg.getKeys();
        final String transactionId = msg.getTransactionId();
        System.out.printf("transactionId=%s key=%s body=%s\n",transactionId,keys,body);

        final int status = Integer.parseInt(arg.toString());
        if(status == 1){
            System.out.println("提交");
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        if(status == 2){
            System.out.println("回滚");
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        // UNKNOW 会回查
        System.out.println("回查");
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("事务回查");
        final String body = new String(msg.getBody());
        final String keys = msg.getKeys();
        final String transactionId = msg.getTransactionId();
        System.out.printf("transactionId=%s key=%s body=%s\n",transactionId,keys,body);
        // 只有Commit、Rollback
        //可以根据key去检查本地事务消息是否完成
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

请输入图片描述

Last modification:November 13th, 2021 at 11:35 pm
如果觉得我的文章对你有用,请随意赞赏