V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
RedBeanIce
V2EX  ›  Java

rocketmq filter message 使用消息丢失问题

  •  
  •   RedBeanIce · 2020-10-18 13:13:03 +08:00 · 1179 次点击
    这是一个创建于 1285 天前的主题,其中的信息可能已经有所发展或是发生改变。

    linux rocketmq 4.7.1

    rocketmq-spring-boot-starter 2.1.1

    生产者代码

    public class FilterProducerTag {
        public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr("192.168.42.131:9876");
            producer.start();
    
            String[] stringArr = {"TagA", "TagB", "TagC"};
            for (int i = 0; i < 6; i++) {
                Message message = new Message();
                message.setTopic("TopicTestFilter");
                // message.setTags(stringArr[i % stringArr.length]);
                message.setTags("TagA");
                message.setKeys("KEY" + i);
                message.setBody(("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    
                // Set some properties.
                SendResult sendResult = producer.send(message);
                System.out.println("=========================================");
                System.out.println(sendResult);
                System.out.println(message);
                System.out.println("=========================================");
            }
            producer.shutdown();
        }
    }
    

    消费者代码

    如下

    public class FilterConsumerTag {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
            consumer.setNamesrvAddr("192.168.42.131:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // only subsribe messages have property a, also a >=0 and a <= 3
    
            // consumer.subscribe("TopicTestFilter", MessageSelector.bySql("a between 0 and 2"));
    
            consumer.subscribe("TopicTestFilter", "TagA || TagB");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
                    for (MessageExt messageExt : messageExtList) {
                        System.out.println("========================================================");
                        System.out.println(messageExt);
                        System.out.println(new String(messageExt.getBody()));
                        System.out.println("========================================================");
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("consumer");
        }
    }
    

    问题

    我在消费者方只收到生产者的四条消息,按道理我应该收到 6 条,全部

    日志

    生产者日志

    RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
    RocketMQLog:WARN Please initialize the logger system properly.
    =========================================
    SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56EFC60000, offsetMsgId=C0A82A8300002A9F00000000000CF5C0, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=0], queueOffset=31]
    Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY0, UNIQ_KEY=64774C6B275C18B4AAC25A56EFC60000, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}
    =========================================
    =========================================
    SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56EFEE0001, offsetMsgId=C0A82A8300002A9F00000000000CF68F, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=1], queueOffset=30]
    Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY1, UNIQ_KEY=64774C6B275C18B4AAC25A56EFEE0001, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}
    =========================================
    =========================================
    SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56EFF80002, offsetMsgId=C0A82A8300002A9F00000000000CF75E, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=2], queueOffset=23]
    Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY2, UNIQ_KEY=64774C6B275C18B4AAC25A56EFF80002, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}
    =========================================
    =========================================
    SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56EFFB0003, offsetMsgId=C0A82A8300002A9F00000000000CF82D, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=3], queueOffset=22]
    Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY3, UNIQ_KEY=64774C6B275C18B4AAC25A56EFFB0003, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51], transactionId='null'}
    =========================================
    =========================================
    SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56EFFD0004, offsetMsgId=C0A82A8300002A9F00000000000CF8FC, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=0], queueOffset=32]
    Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY4, UNIQ_KEY=64774C6B275C18B4AAC25A56EFFD0004, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null'}
    =========================================
    =========================================
    SendResult [sendStatus=SEND_OK, msgId=64774C6B275C18B4AAC25A56F0020005, offsetMsgId=C0A82A8300002A9F00000000000CF9CB, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=1], queueOffset=31]
    Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY5, UNIQ_KEY=64774C6B275C18B4AAC25A56F0020005, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53], transactionId='null'}
    =========================================
    13:00:47.034 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.42.131:10911] result: true
    13:00:47.036 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.42.131:9876] result: true
    
    Process finished with exit code 0
    
    

    消费者日志

    ========================================================
    ========================================================
    MessageExt [brokerName=localhost.localdomain, queueId=0, storeSize=207, queueOffset=31, sysFlag=0, bornTimestamp=1602997246918, bornHost=/192.168.42.1:6700, storeTimestamp=1602997246973, storeHost=/192.168.42.131:10911, msgId=C0A82A8300002A9F00000000000CF5C0, commitLogOffset=849344, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTestFilter', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=33, KEYS=KEY0, CONSUME_START_TIME=1602997259086, UNIQ_KEY=64774C6B275C18B4AAC25A56EFC60000, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]
    Hello RocketMQ 0
    MessageExt [brokerName=localhost.localdomain, queueId=0, storeSize=207, queueOffset=32, sysFlag=0, bornTimestamp=1602997246973, bornHost=/192.168.42.1:6700, storeTimestamp=1602997247002, storeHost=/192.168.42.131:10911, msgId=C0A82A8300002A9F00000000000CF8FC, commitLogOffset=850172, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTestFilter', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=33, KEYS=KEY4, CONSUME_START_TIME=1602997259086, UNIQ_KEY=64774C6B275C18B4AAC25A56EFFD0004, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null'}]
    ========================================================
    ========================================================
    ========================================================
    Hello RocketMQ 4
    ========================================================
    MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=207, queueOffset=31, sysFlag=0, bornTimestamp=1602997246978, bornHost=/192.168.42.1:6700, storeTimestamp=1602997247011, storeHost=/192.168.42.131:10911, msgId=C0A82A8300002A9F00000000000CF9CB, commitLogOffset=850379, bodyCRC=1424393152, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTestFilter', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=32, KEYS=KEY5, CONSUME_START_TIME=1602997259086, UNIQ_KEY=64774C6B275C18B4AAC25A56F0020005, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53], transactionId='null'}]
    MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=207, queueOffset=30, sysFlag=0, bornTimestamp=1602997246958, bornHost=/192.168.42.1:6700, storeTimestamp=1602997246992, storeHost=/192.168.42.131:10911, msgId=C0A82A8300002A9F00000000000CF68F, commitLogOffset=849551, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTestFilter', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=32, KEYS=KEY1, CONSUME_START_TIME=1602997259086, UNIQ_KEY=64774C6B275C18B4AAC25A56EFEE0001, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]
    Hello RocketMQ 5
    ========================================================
    Hello RocketMQ 1
    ========================================================
    
    目前尚无回复
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   1643 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 29ms · UTC 16:56 · PVG 00:56 · LAX 09:56 · JFK 12:56
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.