标签归档:java

RocketMQ学习(七):消息的生命周期下之消息的消费

源代码版本是3.2.6。接着上一篇消息的产生,这篇是消息的消费。Consumer选择DefaultMQPushConsumer为例。 1.DefaultMQPushConsumer.start()开始。 2.RebalanceService.run()方法定时调用RebalanceImpl.doRebalance()方法,该方法内部是遍历订阅的topic,执行rebalanceByTopic(topic)。 3.调用RebalanceImpl.updateProcessQueueTableInRebalance(),构造PullRequest,从Broker获取nextOffset,pullRequest.setNextOffset(nextOffset),同时更新本地消费进度记录。 4.调用RebalancePushImpl.dispatchPullRequest(List)。 5.调用PullMessageService.executePullRequestImmediately(final PullRequest)放入pullRequestQueue队列中去。 6.PullMessageService.run()从pullRequestQueue队列中取出PullRequest,调用DefaultMQPushConsumerImpl.pullMessage(pullRequest)作拉取消息的动作。

发表在 编程语言 | 标签为 , | Comments Off on RocketMQ学习(七):消息的生命周期下之消息的消费

RocketMQ学习(六):消息的生命周期上之消息的产生

源代码版本是3.2.6。消息的生命周期包括2部分,消息的产生和消息的消费,这篇先说下前者。消息的产生详细一点可以分为: a.消息产生后由Producer发送至Broker。 b.Broker接收到消息做持久化。 调试代码得到这样的过程, 1.DefaultMQProducer.send()发出消息。 2.DefaultMQProducerImpl.sendDefaultImpl()发出消息。 3.DefaultMQProducerImpl.tryToFindTopicPublishInfo(),即向Namesrv发出GET_ROUTEINTO_BY_TOPIC的请求,来更新 MQProducerInner的topicPublishInfoTable和MQConsumerInner的topicSubscribeInfoTable。 4.调用topicPublishInfo.selectOneMessageQueue(),从发布的topic中轮询取出一个MessageQueue。默认一个topic对应4个MessageQueue。

发表在 编程语言 | 标签为 , | Comments Off on RocketMQ学习(六):消息的生命周期上之消息的产生

RocketMQ学习(五):Pull和Push

源代码版本是3.2.6。在rocketmq里,consumer被分为2类:MQPullConsumer和MQPushConsumer,其实本质都是拉模式(pull),即consumer轮询从broker拉取消息。 区别是: push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。 pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

发表在 编程语言 | 标签为 , | Comments Off on RocketMQ学习(五):Pull和Push

RocketMQ学习(四):rocketmq-filtersrv介绍和filter原理

源代码版本是3.2.6,还是直接跑源代码,启动配置参照前面写的《简介和quickstart》,启动顺序是namesrv,broker,filtersrv,filter和broker有顺序要求,如果filtersrv启动后找不到broker,则会System.exit()退出程序。 看下启动图: 看rocketmq-filtersrv代码,核心processor包下的只有一个Class类且只处理2种类型的请求,即DefaultRequestProcessor.processRequest()只处理RequestCode.REGISTER_MESSAGE_FILTER_CLASS和RequestCode.PULL_MESSAGE: REGISTER_MESSAGE_FILTER_CLASS:接收consumer端注册过来的filterClass源代码的请求。 PULL_MESSAGE:接收consumer端发出的拉消息的请求。

发表在 编程语言 | 标签为 , | Comments Off on RocketMQ学习(四):rocketmq-filtersrv介绍和filter原理

RocketMQ学习(三):rocketmq-namesrv介绍

刚刚拉了最新的代码,版本是3.2.6,直接NamesrvStartup类的main()方法启动,不需要带启动参数,启动序列图如下: 当broker,producer,consumer都运行后,namesrv一共有8类线程: 1.ServerHouseKeepingService:守护线程,本质是ChannelEventListener,监听broker的channel变化来更新本地的RouteInfo。 2.NSScheduledThread1:定时任务线程,定时跑2个任务,第一个是,每隔10分钟扫描出不活动的broker,然后从routeInfo中删除,第二个是,每个10分钟定时打印configTable的信息。 3.NettyBossSelector_1:Netty的boss线程(Accept线程),这里只有一根线程。 4.NettyEventExecuter:一个单独的线程,监听NettyChannel状态变化来通知ChannelEventListener做响应的动作。

发表在 编程语言 | 标签为 , | Comments Off on RocketMQ学习(三):rocketmq-namesrv介绍

RocketMQ学习(二):依赖关系和模块功能介绍

现在看的代码版本还是3.2.2 develop。先看张内部结构代码图: 从依赖层次再来看,越是被依赖的,越在底层: rocketmq包含9个子模块:

发表在 编程语言 | 标签为 , | Comments Off on RocketMQ学习(二):依赖关系和模块功能介绍

Netty5源码学习之buffer篇(一):PooledHeapByteBuf

PooledHeapByteBuf,带有池的堆内buffer,顾名思义,肯定比一般通过new出来的buffer性能好。把对象放入对象池缓存起来,一般都是因为创建该对象开销比较大,常见的有线程池(ThreadPool)、连接池(ConnectionPool)等。 PooledHeapByteBuf继承关系如下: PooledHeapByteBuf –》 PooledByteBuf –》 AbstractReferenceCountedByteBuf –》 AbstractByteBuf –》 ByteBuf。 继承关系比较简单清晰。 先介绍几个相关的类: PooledByteBufAllocator:buffer分配器,用来分配buffer(包括堆内和堆外)。 PoolArena:一块逻辑上的内存池,用来管理和组织buffer的,内部数据结构较复杂。 FastThreadLocal:较快的ThreadLocal(相对于jdk自带的),实现:线程T扩展于FastThreadLocalAccess,InternalThreadLocalMap是它的成员变量,set()时放入InternalThreadLocalMap的成员变量数组,下标是index,get()时从InternalThreadLocalMap的成员变量数组中下标是index处取。

发表在 编程语言 | 标签为 , , | Comments Off on Netty5源码学习之buffer篇(一):PooledHeapByteBuf

GraphicsMagick使用练习

GraphicsMagick官网地址:http://www.graphicsmagick.org/,介绍不多说网上都有,我这里只使用了它图片缩放功能,用java调用练习。 第一步:下载安装。 我下载的是:GraphicsMagick-1.3.20-Q8-win32-dll.exe,安装目录:C:\Program Files (x86)\GraphicsMagick-1.3.20-Q8。安装完毕后,安装目录自动配置到环境变量的Path中了,安装目录里有gm.exe。测试一下,gm -version,如果打印出GraphicsMagick相关信息则ok。这时就可以在命令行里使用gm命令了。 如果使用java调用报错:Caused by: java.io.IOException: CreateProcess error=2。则表示没有gm命令找不到,也就是说环境变量有可能更改了但是没有生效,我的做法是重启eclipse,运行同样程序就ok。如果还是报错,可能环境变量还是没有生效,重启电脑,又或者是安装目录压根就没有配置到环境变量Path中去,手动配置即可。 第二步:写代码。

发表在 编程语言 | 标签为 , , | Comments Off on GraphicsMagick使用练习

RocketMQ学习(一):简介和QuickStart

RocketMQ是什么? 引用官方描述: RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点: 支持严格的消息顺序 支持Topic与Queue两种模式 亿级消息堆积能力 比较友好的分布式特性 同时支持Push与Pull方式消费消息 历经多次天猫双十一海量消息考验 RocketMQ是纯java编写,基于通信框架Netty。 代码地址:https://github.com/alibaba/RocketMQ,目前分支是3.2.2 develop。 下载完代码后,将各个模块导入eclipse,本地尝试启动看看。 1.启动nameServer,运行rocketmq-namesrv的NamesrvStartup,运行之前需设置环境变量ROCKETMQ_HOME为RocketMQ项目的根目录,这样有一个作用是,指向logback的配置文件路径,保证在nameServer启动时,logback的正常初始化。我本机设置的是:ROCKETMQ_HOME=C:\Users\Administrator\git\RocketMQ。 The Name Server boot success. 表示启动成功。 2.启动brokerServer,运行rocketmq-broker的BrokerStartup,同样,运行之前需设置环境变量ROCKETMQ_HOME,然后启动参数需要带上【-n “192.168.0.109:9876″】,我本机的ip是192.168.0.109。如果不带-n的参数,那么broker会去访问http://jmenv.tbsite.net:8080/rocketmq/nsaddr获取nameServer的地址,这个地址不是我们自己的nameServer。 The broker[LENOVO-PC, 192.168.0.109:10911] boot success. and name server is 192.168.0.109:9876表示成功。 3.这个非必选项,不运行也可以。还可以启动rocketmq-srvutil的FiltersrvStartup,这是Consumer使用Java代码,在服务器做消息过滤。启动方式和broker一样,具体的过滤原理以后再详细的说。 到此就可以运行demo了。

发表在 编程语言 | 标签为 , | Comments Off on RocketMQ学习(一):简介和QuickStart

log4j几个tips

log4j是很常用的日志框架,这里总结几个小知识点: 1. logger是以名称为key,logger为value的形式存储在Hashtable里,所以,logger作为入参不需要传入引用,直接输入名称get即可。 LogFactoryImpl.java 2. logger的继承关系是根据名称的“.”分割来区分,表现为子logger打印日志时遍历父logger里的appender的进行日志打印。 Category.java 3.定义模块名为常量,按模块名称取logger,解决项目里包名类名相同日志无法区分问题。 贴代码: 示例一 示例二 练习代码看这里

发表在 编程语言 | 标签为 , | Comments Off on log4j几个tips