参考 https://blog.csdn.net/asdf08442a/article/details/54882769 整理出来的测试 demo
1、produce 生产者
1 package com.bwdz.sp.comm.util.test; 2 3 import org.apache.rocketmq.client.exception.MQBrokerException; 4 import org.apache.rocketmq.client.exception.MQClientException; 5 import org.apache.rocketmq.client.producer.DefaultMQProducer; 6 import org.apache.rocketmq.client.producer.SendResult; 7 import org.apache.rocketmq.client.producer.SendStatus; 8 import org.apache.rocketmq.common.message.Message; 9 import org.apache.rocketmq.remoting.exception.RemotingException;10 11 import java.util.UUID;12 13 /**14 * Created by xy on 2018/11/16.15 */16 public class SyncProducer {17 private static DefaultMQProducer producer = null;18 19 public static void main(String[] args) {20 System.out.print("[----------]Start\n");21 int pro_count = 1;22 if (args.length > 0) {23 pro_count = Integer.parseInt(args[0]);24 }25 boolean result = false;26 try {27 ProducerStart();28 for (int i = 1; i < pro_count; i++) {29 String msg = "hello rocketmq "+ i+"".toString();30 SendMessage("qch_20170706", //topic31 "Tag"+i, //tag32 "Key"+i, //key33 msg); //body34 System.out.print(msg + "\n");35 }36 }finally {37 producer.shutdown();38 }39 System.out.print("[----------]Succeed\n");40 }41 42 private static boolean ProducerStart() {43 producer = new DefaultMQProducer("pro_qch_test");44 producer.setNamesrvAddr("192.168.69.173:9876");45 producer.setInstanceName(UUID.randomUUID().toString());46 try {47 producer.start();48 } catch(MQClientException e) {49 e.printStackTrace();50 return false;51 }52 return true;53 }54 55 private static boolean SendMessage(String topic,String tag,String key, String str) {56 Message msg = new Message(topic,tag,key,str.getBytes());57 try {58 SendResult result = producer.send(msg);59 SendStatus status = result.getSendStatus();60 System.out.println("___________________________SendMessage: "+status.name());61 } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {62 e.printStackTrace();63 return false;64 }65 return true;66 }67 }
2、consumer 消费者
1 package com.bwdz.sp.comm.util.test; 2 3 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; 4 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; 5 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; 6 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; 7 import org.apache.rocketmq.common.consumer.ConsumeFromWhere; 8 import org.apache.rocketmq.common.message.MessageExt; 9 10 import java.util.List;11 import java.util.UUID;12 13 /**14 * Created by xy on 2018/11/16.15 */16 public class ConsumerTest {17 public static void main(String[] args) {18 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("con_qch_test");19 consumer.setInstanceName(UUID.randomUUID().toString());20 consumer.setConsumeMessageBatchMaxSize(32);21 consumer.setNamesrvAddr("192.168.69.173:9876");22 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);23 consumer.registerMessageListener(new MessageListenerConcurrently() {24 @Override25 public ConsumeConcurrentlyStatus consumeMessage(26 Listlist, ConsumeConcurrentlyContext consumeConcurrentlyContext) {27 for(MessageExt me : list) {28 if("Tag1".equals(me.getTags())){29 System.out.println("处理 Tag1 业务");30 System.out.println(new String(me.getBody()) + "消费成功" + "\n");31 }else if("Tag2".equals(me.getTags())){32 System.out.println("处理 Tag2 业务");33 System.out.println(new String(me.getBody()) + "消费成功" + "\n");34 }else if("Tag3".equals(me.getTags())){35 System.out.println("处理 Tag3 业务");36 System.out.println(new String(me.getBody()) + "消费失败" + "\n");37 return ConsumeConcurrentlyStatus.RECONSUME_LATER;38 }else{39 //consumer.subscribe("qch_20170706", "Tag1||Tag2||Tag3");40 System.out.println("过滤掉的业务"+ me.getKeys());41 }42 }43 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;44 }45 });46 try {47 consumer.subscribe("qch_20170706", "Tag1||Tag2||Tag3");48 consumer.start();49 } catch (Exception e) {50 e.printStackTrace();51 }52 }53 }
先运行produce,控制台输出结果:
[----------]Start___________________________SendMessage: SEND_OKhello rocketmq 1___________________________SendMessage: SEND_OKhello rocketmq 2___________________________SendMessage: SEND_OKhello rocketmq 3___________________________SendMessage: SEND_OKhello rocketmq 4[----------]Succeed
再运行consumer,控制台输出结果:
注:消息 ”hello rocketmq 4“ 被consumer里47行代码过滤掉了,所以不会被消费;消息 “hello rocket 3” 在消费的时候被指定失败ConsumeConcurrentlyStatus.RECONSUME_LATER,表示消费失败,如果被指定失败,表明此消息下次还可以继续发送到consumer被继续消费处理,其他消息则不会被再一次消费
处理 Tag2 业务hello rocketmq 2消费成功处理 Tag3 业务hello rocketmq 3消费失败处理 Tag1 业务hello rocketmq 1消费成功
consumer再次运行,控制台输出结果(直到被指定成功ConsumeConcurrentlyStatus.CONSUME_SUCCESS,Broker服务才不会继续发送消息):
处理 Tag3 业务hello rocketmq 3消费失败