RocketMQ学习(九):顺序消息

rocketmq的顺序消息需要满足2点:

1.Producer端保证发送消息有序,且发送到同一个队列。
2.consumer端保证消费同一个队列。

先看个例子,代码版本跟前面的一样。
Producer类:

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;


/**
 * Producer,发送顺序消息
 */
public class Producer {
    public static void main(String[] args) throws IOException {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

            producer.setNamesrvAddr("192.168.0.104:9876");

            producer.start();

            String[] tags = new String[] { "TagA", "TagC", "TagD" };

            Date date = new Date();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String dateStr = sdf.format(date);
            for (int i = 0; i < 10; i++) {
                // 加个时间后缀
                String body = dateStr + " Hello RocketMQ " + i;
                Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, body.getBytes());

                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        return mqs.get(id);
                    }
                }, 0);//0是队列的下标

                System.out.println(sendResult + ", body:" + body);
            }

            producer.shutdown();
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.in.read();
    }
}

继续阅读

发表在 编程语言 | 标签为 , | Comments Off on RocketMQ学习(九):顺序消息

cassandra入门(三):便捷的@Accessor注解

如果说,入门二的用法是hibernate,那么这篇入门三的用法就更像ibatis。

完整的代码请参考:https://github.com/yankai913/cassandra-java-userguide。

由于本文复用了入门二的部分代码,所以这里只贴新增的。

新增表:

CREATE TABLE complex.users (
id uuid PRIMARY KEY,
name text,
addresses map<text,frozen<address>>
);

继续阅读

发表在 数据库, 编程语言 | 标签为 , | Comments Off on cassandra入门(三):便捷的@Accessor注解

cassandra入门(二):自定义类型使用和ORM

直接贴代码,cql和代码有些地方与分享电子书里的javaDriver21.pdf有些出入,请以博文为准,cql和代码都是实测跑通的。

cql脚本(在cqlsh.bat窗口里跑):

CREATE KEYSPACE complex
WITH replication = {'class' : 'SimpleStrategy', 'replication_factor' :
3};
CREATE TYPE complex.phone (
alias text,
number text
);
CREATE TYPE complex.address (
street text,
city text,
zip_code int,
phones list<frozen<phone>>
);
CREATE TABLE complex.accounts (
email text PRIMARY KEY,
name text,
addr frozen<address>
);

自定义类型:phone和address。
表:accounts 。
继续阅读

发表在 数据库, 编程语言 | 标签为 , | Comments Off on cassandra入门(二):自定义类型使用和ORM

cassandra入门(一):jdbc连接cassandra作增删改查

先分享一个最新的cassandra-java-driver文档,点击电子书分享里的链接,找到javaDriver21.pdf。

该文档内容比较全,包含:jdbc连接cassandra集群,执行cql增删改查,批量查询,异步查询,cql的类型和java类型的映射关系及用户自定义类型使用,ORM等。

Cassandra是一个NoSql数据库,纯java编写,apache的顶级项目,主页:http://cassandra.apache.org/(简介不多说网上有)。

入门步骤:(我的jdk版本是1.7.0_71,win7系统)

1.去主页下载cassandra,我下载的是apache-cassandra-2.1.9,然后bin/cassandra.bat启动数据库,如果想使用bin/cqlsh.bat则需要安装python2.7。

2.贴代码

		<dependency>
			<groupId>com.datastax.cassandra</groupId>
			<artifactId>cassandra-driver-core</artifactId>
			<version>2.1.5</version>
		</dependency>

继续阅读

发表在 数据库, 编程语言 | 标签为 , | Comments Off on cassandra入门(一):jdbc连接cassandra作增删改查

RocketMQ学习(八):事务消息

源代码版本是3.2.6,还是直接跑源代码。rocketmq事务消息是发生在Producer和Broker之间,是二阶段提交。

二阶段提交过程看图:
事务逻辑

第一阶段是:步骤1,2,3。
第二阶段是:步骤4,5。

具体说明:

只有在消息发送成功,并且本地操作执行成功时,才发送提交事务消息,做事务提交。

其他的情况,例如消息发送失败,直接发送回滚消息,进行回滚,或者发送消息成功,但是执行本地操作失败,也是发送回滚消息,进行回滚。
继续阅读

发表在 编程语言 | 标签为 , , | Comments Off on RocketMQ学习(八):事务消息

RocketMQ学习(七):消息的生命周期下之消息的消费

源代码版本是3.2.6。接着上一篇消息的产生,这篇是消息的消费。Consumer选择DefaultMQPushConsumer为例。

1.DefaultMQPushConsumer.start()开始。

2.RebalanceService.run()方法定时调用RebalanceImpl.doRebalance()方法,该方法内部是遍历订阅的topic,执行rebalanceByTopic(topic)。

3.调用RebalanceImpl.updateProcessQueueTableInRebalance(),构造PullRequest,从Broker获取nextOffset,pullRequest.setNextOffset(nextOffset),同时更新本地消费进度记录。

4.调用RebalancePushImpl.dispatchPullRequest(List)。

5.调用PullMessageService.executePullRequestImmediately(final PullRequest)放入pullRequestQueue队列中去。

6.PullMessageService.run()从pullRequestQueue队列中取出PullRequest,调用DefaultMQPushConsumerImpl.pullMessage(pullRequest)作拉取消息的动作。

继续阅读

发表在 编程语言 | 标签为 , | Comments Off on RocketMQ学习(七):消息的生命周期下之消息的消费

RocketMQ学习(六):消息的生命周期上之消息的产生

源代码版本是3.2.6。消息的生命周期包括2部分,消息的产生和消息的消费,这篇先说下前者。消息的产生详细一点可以分为:

a.消息产生后由Producer发送至Broker。

b.Broker接收到消息做持久化。

调试代码得到这样的过程,

1.DefaultMQProducer.send()发出消息。

2.DefaultMQProducerImpl.sendDefaultImpl()发出消息。

3.DefaultMQProducerImpl.tryToFindTopicPublishInfo(),即向Namesrv发出GET_ROUTEINTO_BY_TOPIC的请求,来更新
MQProducerInner的topicPublishInfoTable和MQConsumerInner的topicSubscribeInfoTable。

4.调用topicPublishInfo.selectOneMessageQueue(),从发布的topic中轮询取出一个MessageQueue。默认一个topic对应4个MessageQueue。

继续阅读

发表在 编程语言 | 标签为 , | Comments Off on RocketMQ学习(六):消息的生命周期上之消息的产生

RocketMQ学习(五):Pull和Push

源代码版本是3.2.6。在rocketmq里,consumer被分为2类:MQPullConsumer和MQPushConsumer,其实本质都是拉模式(pull),即consumer轮询从broker拉取消息。

区别是:

push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

继续阅读

发表在 编程语言 | 标签为 , | Comments Off on RocketMQ学习(五):Pull和Push

RocketMQ学习(四):rocketmq-filtersrv介绍和filter原理

源代码版本是3.2.6,还是直接跑源代码,启动配置参照前面写的《简介和quickstart》,启动顺序是namesrv,broker,filtersrv,filter和broker有顺序要求,如果filtersrv启动后找不到broker,则会System.exit()退出程序。

看下启动图:
filtersrv_start

看rocketmq-filtersrv代码,核心processor包下的只有一个Class类且只处理2种类型的请求,即DefaultRequestProcessor.processRequest()只处理RequestCode.REGISTER_MESSAGE_FILTER_CLASS和RequestCode.PULL_MESSAGE:

REGISTER_MESSAGE_FILTER_CLASS:接收consumer端注册过来的filterClass源代码的请求。

PULL_MESSAGE:接收consumer端发出的拉消息的请求。

继续阅读

发表在 编程语言 | 标签为 , | Comments Off on RocketMQ学习(四):rocketmq-filtersrv介绍和filter原理

RocketMQ学习(三):rocketmq-namesrv介绍

刚刚拉了最新的代码,版本是3.2.6,直接NamesrvStartup类的main()方法启动,不需要带启动参数,启动序列图如下:

namesrv_start

当broker,producer,consumer都运行后,namesrv一共有8类线程:

1.ServerHouseKeepingService:守护线程,本质是ChannelEventListener,监听broker的channel变化来更新本地的RouteInfo。

2.NSScheduledThread1:定时任务线程,定时跑2个任务,第一个是,每隔10分钟扫描出不活动的broker,然后从routeInfo中删除,第二个是,每个10分钟定时打印configTable的信息。

3.NettyBossSelector_1:Netty的boss线程(Accept线程),这里只有一根线程。

4.NettyEventExecuter:一个单独的线程,监听NettyChannel状态变化来通知ChannelEventListener做响应的动作。

继续阅读

发表在 编程语言 | 标签为 , | Comments Off on RocketMQ学习(三):rocketmq-namesrv介绍