原文链接 译者:小村长
什么是广播
广播是向所有用户发送消息。 如果您希望所有订阅者都能收到有关某个主题的消息,则广播是一个不错的选择。
生产者示例
public class BroadcastProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.start(); for (int i = 0; i < 100; i++){ Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } producer.shutdown(); } }
消费者示例
public class BroadcastConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //set to broadcast mode consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TopicTest", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); } }