源代码版本是3.2.6,还是直接跑源代码,启动配置参照前面写的《简介和quickstart》,启动顺序是namesrv,broker,filtersrv,filter和broker有顺序要求,如果filtersrv启动后找不到broker,则会System.exit()退出程序。
看rocketmq-filtersrv代码,核心processor包下的只有一个Class类且只处理2种类型的请求,即DefaultRequestProcessor.processRequest()只处理RequestCode.REGISTER_MESSAGE_FILTER_CLASS和RequestCode.PULL_MESSAGE:
REGISTER_MESSAGE_FILTER_CLASS:接收consumer端注册过来的filterClass源代码的请求。
PULL_MESSAGE:接收consumer端发出的拉消息的请求。
看代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | < properties > < project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding > < logback.version >1.0.13</ logback.version > < rocketmq.version >3.2.6</ rocketmq.version > </ properties > < dependencies > < dependency > < groupId >ch.qos.logback</ groupId > < artifactId >logback-classic</ artifactId > < version >1.0.13</ version > </ dependency > < dependency > < groupId >ch.qos.logback</ groupId > < artifactId >logback-core</ artifactId > < version >1.0.13</ version > </ dependency > < dependency > < groupId >com.alibaba.rocketmq</ groupId > < artifactId >rocketmq-client</ artifactId > < version >${rocketmq.version}</ version > </ dependency > < dependency > < groupId >junit</ groupId > < artifactId >junit</ artifactId > < version >4.10</ version > < scope >test</ scope > </ dependency > </ dependencies > |
filter类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | package com.zoo.quickstart.filter; import com.alibaba.rocketmq.common.filter.MessageFilter; import com.alibaba.rocketmq.common.message.MessageExt; public class MessageFilterImpl implements MessageFilter { @Override public boolean match(MessageExt msg) { String property = msg.getUserProperty( "SequenceId" ); if (property != null ) { int id = Integer.parseInt(property); if ((id % 3 ) == 0 && (id > 10 )) { return true ; } } return false ; } } |
producer类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | package com.zoo.quickstart.filter; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer( "ProducerGroupName" ); producer.setNamesrvAddr( "192.168.0.119:9876" ); producer.start(); try { for ( int i = 0 ; i < 6000000 ; i++) { Message msg = new Message( "TopicFilter7" , // topic "TagA" , // tag "OrderID001" , // key ( "Hello MetaQ" ).getBytes()); // body msg.putUserProperty( "SequenceId" , String.valueOf(i)); SendResult sendResult = producer.send(msg); System.out.println(sendResult); Thread.sleep( 3000 ); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } } |
consumer类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | package com.zoo.quickstart.filter; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.MixAll; import com.alibaba.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "ConsumerGroupNamecc4" ); consumer.setNamesrvAddr( "192.168.0.119:9876" ); // 使用Java代码,在服务器做消息过滤 String filterCode = MixAll.file2String( "D:\\workspace\\rocketmq-quickstart\\src\\main\\java\\com\\zoo\\quickstart\\filter\\MessageFilterImpl.java" ); consumer.subscribe( "TopicFilter7" , "com.zoo.quickstart.filter.MessageFilterImpl" , filterCode); consumer.registerMessageListener( new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br> */ consumer.start(); System.out.println( "Consumer Started." ); } } |