博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMq消息 demo
阅读量:5901 次
发布时间:2019-06-19

本文共 6009 字,大约阅读时间需要 20 分钟。

参考 https://blog.csdn.net/asdf08442a/article/details/54882769 整理出来的测试 demo

1、produce 生产者

1 package com.bwdz.sp.comm.util.test; 2  3 import org.apache.rocketmq.client.exception.MQBrokerException; 4 import org.apache.rocketmq.client.exception.MQClientException; 5 import org.apache.rocketmq.client.producer.DefaultMQProducer; 6 import org.apache.rocketmq.client.producer.SendResult; 7 import org.apache.rocketmq.client.producer.SendStatus; 8 import org.apache.rocketmq.common.message.Message; 9 import org.apache.rocketmq.remoting.exception.RemotingException;10 11 import java.util.UUID;12 13 /**14  * Created by xy on 2018/11/16.15  */16 public class SyncProducer {17     private static DefaultMQProducer producer = null;18 19     public static void main(String[] args) {20         System.out.print("[----------]Start\n");21         int pro_count = 1;22         if (args.length > 0) {23             pro_count = Integer.parseInt(args[0]);24         }25         boolean result = false;26         try {27             ProducerStart();28             for (int i = 1; i < pro_count; i++) {29                 String msg = "hello rocketmq "+ i+"".toString();30                 SendMessage("qch_20170706",              //topic31                         "Tag"+i,                           //tag32                         "Key"+i,                           //key33                          msg);                                  //body34                 System.out.print(msg + "\n");35             }36         }finally {37             producer.shutdown();38         }39         System.out.print("[----------]Succeed\n");40     }41 42     private static boolean ProducerStart() {43         producer = new DefaultMQProducer("pro_qch_test");44         producer.setNamesrvAddr("192.168.69.173:9876");45         producer.setInstanceName(UUID.randomUUID().toString());46         try {47             producer.start();48         } catch(MQClientException e) {49             e.printStackTrace();50             return false;51         }52         return true;53     }54 55     private static boolean SendMessage(String topic,String tag,String key, String str) {56         Message msg = new Message(topic,tag,key,str.getBytes());57         try {58             SendResult result = producer.send(msg);59             SendStatus status = result.getSendStatus();60             System.out.println("___________________________SendMessage: "+status.name());61         } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {62             e.printStackTrace();63             return false;64         }65         return true;66     }67 }
View Code

2、consumer 消费者

1 package com.bwdz.sp.comm.util.test; 2  3 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; 4 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; 5 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; 6 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; 7 import org.apache.rocketmq.common.consumer.ConsumeFromWhere; 8 import org.apache.rocketmq.common.message.MessageExt; 9 10 import java.util.List;11 import java.util.UUID;12 13 /**14  * Created by xy on 2018/11/16.15  */16 public class ConsumerTest {17     public static void main(String[] args) {18         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("con_qch_test");19         consumer.setInstanceName(UUID.randomUUID().toString());20         consumer.setConsumeMessageBatchMaxSize(32);21         consumer.setNamesrvAddr("192.168.69.173:9876");22         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);23         consumer.registerMessageListener(new MessageListenerConcurrently() {24             @Override25             public ConsumeConcurrentlyStatus consumeMessage(26                     List
list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {27 for(MessageExt me : list) {28 if("Tag1".equals(me.getTags())){29 System.out.println("处理 Tag1 业务");30 System.out.println(new String(me.getBody()) + "消费成功" + "\n");31 }else if("Tag2".equals(me.getTags())){32 System.out.println("处理 Tag2 业务");33 System.out.println(new String(me.getBody()) + "消费成功" + "\n");34 }else if("Tag3".equals(me.getTags())){35 System.out.println("处理 Tag3 业务");36 System.out.println(new String(me.getBody()) + "消费失败" + "\n");37 return ConsumeConcurrentlyStatus.RECONSUME_LATER;38 }else{39 //consumer.subscribe("qch_20170706", "Tag1||Tag2||Tag3");40 System.out.println("过滤掉的业务"+ me.getKeys());41 }42 }43 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;44 }45 });46 try {47 consumer.subscribe("qch_20170706", "Tag1||Tag2||Tag3");48 consumer.start();49 } catch (Exception e) {50 e.printStackTrace();51 }52 }53 }
View Code

 

先运行produce,控制台输出结果:

[----------]Start___________________________SendMessage: SEND_OKhello rocketmq 1___________________________SendMessage: SEND_OKhello rocketmq 2___________________________SendMessage: SEND_OKhello rocketmq 3___________________________SendMessage: SEND_OKhello rocketmq 4[----------]Succeed

再运行consumer,控制台输出结果:

注:消息 ”hello rocketmq 4“ 被consumer里47行代码过滤掉了,所以不会被消费;消息 “hello rocket 3” 在消费的时候被指定失败ConsumeConcurrentlyStatus.RECONSUME_LATER,表示消费失败,如果被指定失败,表明此消息下次还可以继续发送到consumer被继续消费处理,其他消息则不会被再一次消费

处理 Tag2 业务hello rocketmq 2消费成功处理 Tag3 业务hello rocketmq 3消费失败处理 Tag1 业务hello rocketmq 1消费成功

consumer再次运行,控制台输出结果(直到被指定成功ConsumeConcurrentlyStatus.CONSUME_SUCCESS,Broker服务才不会继续发送消息):

处理 Tag3 业务hello rocketmq 3消费失败

 

转载于:https://www.cnblogs.com/maxmoore/p/9988751.html

你可能感兴趣的文章
Jscex版Loading插件预览版本抢先看
查看>>
C#实现winform仿div+css半透明遮罩效果
查看>>
在ubuntu下安装freeradius的web管理站点ARA
查看>>
pygame学习笔记(5)——精灵
查看>>
在listview中的某一列添加按钮
查看>>
Phpcms之核心目录phpcms
查看>>
php中字符串转义函数
查看>>
YUV格式详解
查看>>
AndroidのUI设计研究(一)——自定义ProgressBar
查看>>
2012 Multi-University Training Contest 6
查看>>
QT 按钮(4种样式)
查看>>
踏破铁鞋,Vmware 8完美安装Mac Lion狮子系统,CPU不支持虚拟化,键盘无响经验共享...
查看>>
nutch 与 solr 的结合
查看>>
Introduction
查看>>
c#正则表达式 特殊格式内容的提取
查看>>
Javascript innerhtml
查看>>
hdu 1811 Rank of Tetris(并查集+拓扑排序)
查看>>
转:3位90后创业!PeakLabs推猛犸5等产品
查看>>
从简单工厂到工厂方法
查看>>
MySQL innodb_table_monitor 解析
查看>>