博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊spring for kafka的AckMode
阅读量:7304 次
发布时间:2019-06-30

本文共 8517 字,大约阅读时间需要 28 分钟。

  hot3.png

本文主要讲述一下spring for kafka的consumer在spring.kafka.consumer.enable-auto-commit是false情况下,AckMode的选项

AckMode

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/AbstractMessageListenerContainer.java$AckMode

/**	 * The offset commit behavior enumeration.	 */	public enum AckMode {		/**		 * Commit after each record is processed by the listener.		 */		RECORD,		/**		 * Commit whatever has already been processed before the next poll.		 */		BATCH,		/**		 * Commit pending updates after		 * {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.		 */		TIME,		/**		 * Commit pending updates after		 * {@link ContainerProperties#setAckCount(int) ackCount} has been		 * exceeded.		 */		COUNT,		/**		 * Commit pending updates after		 * {@link ContainerProperties#setAckCount(int) ackCount} has been		 * exceeded or after {@link ContainerProperties#setAckTime(long)		 * ackTime} has elapsed.		 */		COUNT_TIME,		/**		 * User takes responsibility for acks using an		 * {@link AcknowledgingMessageListener}.		 */		MANUAL,		/**		 * User takes responsibility for acks using an		 * {@link AcknowledgingMessageListener}. The consumer is woken to		 * immediately process the commit.		 */		MANUAL_IMMEDIATE,	}
  • RECORD 每处理一条commit一次
  • BATCH(默认) 每次poll的时候批量提交一次,频率取决于每次poll的调用频率
  • TIME 每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
  • COUNT 累积达到ackCount次的ack去commit
  • COUNT_TIME ackTime或ackCount哪个条件先满足,就commit
  • MANUAL listener负责ack,但是背后也是批量上去
  • MANUAL_IMMEDIATE listner负责ack,每调用一次,就立即commit

KafkaMessageListenerContainer$ListenerConsumer

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

@Override		public void run() {			if (this.theListener instanceof ConsumerSeekAware) {				((ConsumerSeekAware) this.theListener).registerSeekCallback(this);			}			this.count = 0;			this.last = System.currentTimeMillis();			if (isRunning() && this.definedPartitions != null) {				initPartitionsIfNeeded();				// we start the invoker here as there will be no rebalance calls to				// trigger it, but only if the container is not set to autocommit				// otherwise we will process records on a separate thread				if (!this.autoCommit) {					startInvoker();				}			}			long lastReceive = System.currentTimeMillis();			long lastAlertAt = lastReceive;			while (isRunning()) {				try {					if (!this.autoCommit) {						processCommits();					}					processSeeks();					if (this.logger.isTraceEnabled()) {						this.logger.trace("Polling (paused=" + this.paused + ")...");					}					ConsumerRecords
records = this.consumer.poll(this.containerProperties.getPollTimeout()); if (records != null && this.logger.isDebugEnabled()) { this.logger.debug("Received: " + records.count() + " records"); } if (records != null && records.count() > 0) { if (this.containerProperties.getIdleEventInterval() != null) { lastReceive = System.currentTimeMillis(); } // if the container is set to auto-commit, then execute in the // same thread // otherwise send to the buffering queue if (this.autoCommit) { invokeListener(records); } else { if (sendToListener(records)) { if (this.assignedPartitions != null) { // avoid group management rebalance due to a slow // consumer this.consumer.pause(this.assignedPartitions); this.paused = true; this.unsent = records; } } } } else { if (this.containerProperties.getIdleEventInterval() != null) { long now = System.currentTimeMillis(); if (now > lastReceive + this.containerProperties.getIdleEventInterval() && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) { publishIdleContainerEvent(now - lastReceive); lastAlertAt = now; if (this.theListener instanceof ConsumerSeekAware) { seekPartitions(getAssignedPartitions(), true); } } } } this.unsent = checkPause(this.unsent); } catch (WakeupException e) { this.unsent = checkPause(this.unsent); } catch (Exception e) { if (this.containerProperties.getGenericErrorHandler() != null) { this.containerProperties.getGenericErrorHandler().handle(e, null); } else { this.logger.error("Container exception", e); } } } if (this.listenerInvokerFuture != null) { stopInvoker(); commitManualAcks(); } try { this.consumer.unsubscribe(); } catch (WakeupException e) { // No-op. Continue process } this.consumer.close(); if (this.logger.isInfoEnabled()) { this.logger.info("Consumer stopped"); } }

这里while循环每次都判断是否auto commit,如果不是则processCommits

private void processCommits() {			handleAcks();			this.count += this.acks.size();			long now;			AckMode ackMode = this.containerProperties.getAckMode();			if (!this.isManualImmediateAck) {				if (!this.isManualAck) {					updatePendingOffsets();				}				boolean countExceeded = this.count >= this.containerProperties.getAckCount();				if (this.isManualAck || this.isBatchAck || this.isRecordAck						|| (ackMode.equals(AckMode.COUNT) && countExceeded)) {					if (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) {						this.logger.debug("Committing in AckMode.COUNT because count " + this.count								+ " exceeds configured limit of " + this.containerProperties.getAckCount());					}					commitIfNecessary();					this.count = 0;				}				else {					now = System.currentTimeMillis();					boolean elapsed = now - this.last > this.containerProperties.getAckTime();					if (ackMode.equals(AckMode.TIME) && elapsed) {						if (this.logger.isDebugEnabled()) {							this.logger.debug("Committing in AckMode.TIME " +									"because time elapsed exceeds configured limit of " +									this.containerProperties.getAckTime());						}						commitIfNecessary();						this.last = now;					}					else if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) {						if (this.logger.isDebugEnabled()) {							if (elapsed) {								this.logger.debug("Committing in AckMode.COUNT_TIME " +										"because time elapsed exceeds configured limit of " +										this.containerProperties.getAckTime());							}							else {								this.logger.debug("Committing in AckMode.COUNT_TIME " +										"because count " + this.count + " exceeds configured limit of" +										this.containerProperties.getAckCount());							}						}						commitIfNecessary();						this.last = now;						this.count = 0;					}				}			}		}

handleAcks

private void handleAcks() {			ConsumerRecord
record = this.acks.poll(); while (record != null) { if (this.logger.isTraceEnabled()) { this.logger.trace("Ack: " + record); } processAck(record); record = this.acks.poll(); } } private void processAck(ConsumerRecord
record) { if (ListenerConsumer.this.isManualImmediateAck) { try { ackImmediate(record); } catch (WakeupException e) { // ignore - not polling } } else { addOffset(record); } }

这里可以看到,如果不是isManualImmediateAck,则每次是累加到offsets的map中

commitIfNecessary

private void commitIfNecessary() {			Map
commits = new HashMap<>(); for (Entry
> entry : this.offsets.entrySet()) { for (Entry
offset : entry.getValue().entrySet()) { commits.put(new TopicPartition(entry.getKey(), offset.getKey()), new OffsetAndMetadata(offset.getValue() + 1)); } } this.offsets.clear(); if (this.logger.isDebugEnabled()) { this.logger.debug("Commit list: " + commits); } if (!commits.isEmpty()) { if (this.logger.isDebugEnabled()) { this.logger.debug("Committing: " + commits); } try { if (this.containerProperties.isSyncCommits()) { this.consumer.commitSync(commits); } else { this.consumer.commitAsync(commits, this.commitCallback); } } catch (WakeupException e) { // ignore - not polling if (this.logger.isDebugEnabled()) { this.logger.debug("Woken up during commit"); } } } }

这里会从offsets的map组装出commits,然后去提交(commitSync或者commitAsync),然后clear掉offsets

manual commit

@KafkaListener(topics = "k010")    public void listen(ConsumerRecord
cr,Acknowledgment ack) throws Exception { LOGGER.info(cr.toString()); ack.acknowledge(); }

方法参数里头传递Acknowledgment,然后手工ack 前提要配置AckMode

instance.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);

doc

转载于:https://my.oschina.net/go4it/blog/1550234

你可能感兴趣的文章
各种HTTPS站点的SSL证书 ,扩展SSL证书,密钥交换和身份验证机制汇总
查看>>
IOS UI总结
查看>>
【歪谈】重构浪漫:未来属于浪漫的80后
查看>>
还在抱怨JS文件里没有智能提示吗, VS10以及以上都可以 .NET
查看>>
如何生成IStyleGalleryItem和ISymbol对象的预览图(转载)
查看>>
数组插件----linq.js
查看>>
EF多表查询方式
查看>>
eclipse上传显示svn上传者名
查看>>
rsyslog日志服务的配置文件分析
查看>>
PCA降纬一步一步
查看>>
Swift3.0语言教程字符串大小写转化
查看>>
常用牛人主页链接(计算机视觉、模式识别、机器学习相关方向,陆续更新。。。。)...
查看>>
php composer 安装
查看>>
【30集iCore3_ADP出厂源代码(ARM部分)讲解视频】30-3 底层驱动之LED_蜂鸣器
查看>>
Netty 实现聊天功能
查看>>
从教20年随笔——4409的归来
查看>>
依据硬件设备配置高性能的Nginx
查看>>
STM32F429I-DISCO 和GPS的亲热接触
查看>>
baksmali反编译出现:UNEXPECTED TOP-LEVEL ERROR:....Too many open files
查看>>
查找存在某字符的文件列表,不包括svn文件
查看>>