亚洲精品久久久久久第一页-人妻少妇精彩视品一区二区三区-91国产自拍免费视频-免费一级a在线播放视频正片-少妇天天日天天射天天爽-国产大屁股喷水视频在线观看-操美女骚穴抽插性爱视频-亚洲 欧美 中文字幕 丝袜-成人免费无码片在线观看

kafka簡單的入門案例 kafka應用實例( 六 )


application.yml配置如下:server: port: 8080spring: kafka:bootstrap-servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094producer: # 生產(chǎn)者retries: 3 # 設置大于0的值,則客戶端會將發(fā)送失敗的記錄重新發(fā)送batch-size: 16384buffer-memory: 33554432acks: 1# 指定消息key和消息體的編解碼方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 當每一條記錄被消費者監(jiān)聽器(ListenerConsumer)處理之后提交# RECORD# 當每一批poll()的數(shù)據(jù)被消費者監(jiān)聽器(ListenerConsumer)處理之后提交# BATCH# 當每一批poll()的數(shù)據(jù)被消費者監(jiān)聽器(ListenerConsumer)處理之后,距離上次提交時間大于TIME時提交# TIME# 當每一批poll()的數(shù)據(jù)被消費者監(jiān)聽器(ListenerConsumer)處理之后,被處理record數(shù)量大于等于COUNT時提交# COUNT# TIME | COUNT 有一個條件滿足時提交# COUNT_TIME# 當每一批poll()的數(shù)據(jù)被消費者監(jiān)聽器(ListenerConsumer)處理之后, 手動調(diào)用Acknowledgment.acknowledge()后提交# MANUAL# 手動調(diào)用Acknowledgment.acknowledge()后立即提交,一般使用這種# MANUAL_IMMEDIATEack-mode: manual_immediate生產(chǎn)者代碼:package com.yundasys.usercenter.collect.api.vo.req;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.RequestMapping;/** * @program: usercenter-portrait-collect * @description: KafkaController * @author: yxh-word * @create: 2021-07-14 * @version: v1.0.0 創(chuàng)建文件, yxh-word, 2021-07-14 **/public class KafkaController {private final static String TOPIC_NAME = "my-replicated-topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")public void send() {kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");}}消費者代碼:package com.yundasys.usercenter.collect.api.vo.req;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;/** * @program: usercenter-portrait-collect * @description: MyConsumer * @author: yxh-word * @create: 2021-07-14 * @version: v1.0.0 創(chuàng)建文件, yxh-word, 2021-07-14 **/public class MyConsumer {/*** @KafkaListener(groupId = "testGroup", topicPartitions = {*@TopicPartition(topic = "topic1", partitions = {"0", "1"}),*@TopicPartition(topic = "topic2", partitions = "0",*partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))*},concurrency = "6")*//concurrency就是同組下的消費者個數(shù),就是并發(fā)消費數(shù),必須小于等于分區(qū)總數(shù)* @param record*/@KafkaListener(topics = "my-replicated-topic",groupId = "yundaGroup")public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = https://www.520longzhigu.com/diannao/record.value();System.out.println(value);System.out.println(record);//手動提交offsetack.acknowledge();}/*//配置多個消費組@KafkaListener(topics ="my-replicated-topic",groupId = "likeGroup")public void listenTulingGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = https://www.520longzhigu.com/diannao/record.value();System.out.println(value);System.out.println(record);ack.acknowledge();}*/}


以上關于本文的內(nèi)容,僅作參考!溫馨提示:如遇健康、疾病相關的問題,請您及時就醫(yī)或請專業(yè)人士給予相關指導!

「愛刨根生活網(wǎng)」www.malaban59.cn小編還為您精選了以下內(nèi)容,希望對您有所幫助: