application.yml配置如下:server: port: 8080spring: kafka:bootstrap-servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094producer: # 生產(chǎn)者retries: 3 # 設置大于0的值,則客戶端會將發(fā)送失敗的記錄重新發(fā)送batch-size: 16384buffer-memory: 33554432acks: 1# 指定消息key和消息體的編解碼方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 當每一條記錄被消費者監(jiān)聽器(ListenerConsumer)處理之后提交# RECORD# 當每一批poll()的數(shù)據(jù)被消費者監(jiān)聽器(ListenerConsumer)處理之后提交# BATCH# 當每一批poll()的數(shù)據(jù)被消費者監(jiān)聽器(ListenerConsumer)處理之后,距離上次提交時間大于TIME時提交# TIME# 當每一批poll()的數(shù)據(jù)被消費者監(jiān)聽器(ListenerConsumer)處理之后,被處理record數(shù)量大于等于COUNT時提交# COUNT# TIME | COUNT 有一個條件滿足時提交# COUNT_TIME# 當每一批poll()的數(shù)據(jù)被消費者監(jiān)聽器(ListenerConsumer)處理之后, 手動調(diào)用Acknowledgment.acknowledge()后提交# MANUAL# 手動調(diào)用Acknowledgment.acknowledge()后立即提交,一般使用這種# MANUAL_IMMEDIATEack-mode: manual_immediate生產(chǎn)者代碼:package com.yundasys.usercenter.collect.api.vo.req;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.RequestMapping;/** * @program: usercenter-portrait-collect * @description: KafkaController * @author: yxh-word * @create: 2021-07-14 * @version: v1.0.0 創(chuàng)建文件, yxh-word, 2021-07-14 **/public class KafkaController {private final static String TOPIC_NAME = "my-replicated-topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")public void send() {kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");}}消費者代碼:package com.yundasys.usercenter.collect.api.vo.req;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;/** * @program: usercenter-portrait-collect * @description: MyConsumer * @author: yxh-word * @create: 2021-07-14 * @version: v1.0.0 創(chuàng)建文件, yxh-word, 2021-07-14 **/public class MyConsumer {/*** @KafkaListener(groupId = "testGroup", topicPartitions = {*@TopicPartition(topic = "topic1", partitions = {"0", "1"}),*@TopicPartition(topic = "topic2", partitions = "0",*partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))*},concurrency = "6")*//concurrency就是同組下的消費者個數(shù),就是并發(fā)消費數(shù),必須小于等于分區(qū)總數(shù)* @param record*/@KafkaListener(topics = "my-replicated-topic",groupId = "yundaGroup")public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = https://www.520longzhigu.com/diannao/record.value();System.out.println(value);System.out.println(record);//手動提交offsetack.acknowledge();}/*//配置多個消費組@KafkaListener(topics ="my-replicated-topic",groupId = "likeGroup")public void listenTulingGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = https://www.520longzhigu.com/diannao/record.value();System.out.println(value);System.out.println(record);ack.acknowledge();}*/}
以上關于本文的內(nèi)容,僅作參考!溫馨提示:如遇健康、疾病相關的問題,請您及時就醫(yī)或請專業(yè)人士給予相關指導!
「愛刨根生活網(wǎng)」www.malaban59.cn小編還為您精選了以下內(nèi)容,希望對您有所幫助:- 一般的玉手鐲賣多少錢:翡翠項鏈編織繩,好的項鏈要配獨一無二的繩子
- 一般的玉手鐲賣多少錢:翡翠項鏈鑒定機構,專業(yè)的就是讓人信賴
- 2022年天貓年貨節(jié)紅包入口在哪里 天貓年貨節(jié)紅包口令怎么設置的
- 2022大連泉水疫情源頭在哪 大連疫情活動軌跡是怎樣的
- 蘇打水是碳酸飲料嗎
- 文玩核桃怎么清洗
- 坐飛機的流程步驟有哪些?
- 原唱者王琪攜手許歌淳主演院線電影 可可托海的牧羊人原唱
- 鏡子是用什么材料做的
- 筋膜槍怎么使用
