傳智播客旗下品牌:|||||

全國咨詢/投訴熱線:400-618-4000

Kafka的常用API介紹[大數據培訓]

更新時間:2020年01月03日13時44分 來源:傳智播客 瀏覽次數:

傳智播客



一、消息發送

1.異步發送

1)導入依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>


 
2)編寫代碼
需要用到的類:
KafkaProducer:需要創建一個生產者對象,用來發送數據
ProducerConfig:獲取所需的一系列配置參數
ProducerRecord:每條數據都要封裝成一個ProducerRecord對象
 

1.不帶回調函數的API

package com.heima.kafka;
 
import org.apache.kafka.clients.producer.*;
 
import java.util.Properties;
import java.util.concurrent.ExecutionException;
 
public class CustomProducer {
 
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();
    }
}



2.帶回調函數的API

回調函數會在producer收到ack時調用,為異步調用,該方法有兩個參數,分別是RecordMetadata和Exception,如果Exception為null,說明消息發送成功,如果Exception不為null,說明消息發送失敗。
注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。

package com.heima.kafka;
 
import org.apache.kafka.clients.producer.*;
 
import java.util.Properties;
import java.util.concurrent.ExecutionException;
 
public class CustomProducer {
 
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)), new Callback() {
 
                //回調函數,該方法會在Producer收到ack時調用,為異步調用
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("success->" + metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }
}

 

2.同步發送

同步發送的意思就是,一條消息發送之后,會阻塞當前線程,直至返回ack。
由于send方法返回的是一個Future對象,根據Futrue對象的特點,我們也可以實現同步發送的效果,只需在調用Future對象的get方發即可。

package com.heima.kafka;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
 
import java.util.Properties;
import java.util.concurrent.ExecutionException;
 
public class CustomProducer {
 
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))).get();
        }
        producer.close();
    }
}
 

二、消息消費

Consumer消費數據時的可靠性是很容易保證的,因為數據在Kafka中是持久化的,故不用擔心數據丟失問題。
由于consumer在消費過程中可能會出現斷電宕機等故障,consumer恢復后,需要從故障前的位置的繼續消費,所以consumer需要實時記錄自己消費到了哪個offset,以便故障恢復后繼續消費。所以offset的維護是Consumer消費數據是必須考慮的問題。


1。自動提交offset

1)導入依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>


2)編寫代碼
需要用到的類:
KafkaConsumer:需要創建一個消費者對象,用來消費數據
ConsumerConfig:獲取所需的一系列配置參數
ConsuemrRecord:每條數據都要封裝成一個ConsumerRecord對象
為了使我們能夠專注于自己的業務邏輯,Kafka提供了自動提交offset的功能。
自動提交offset的相關參數:
enable.auto.commit是否開啟自動提交offset功能
auto.commit.interval.ms自動提交offset的時間間隔
以下為自動提交offset的代碼:

package com.heima.kafka;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
import java.util.Arrays;
import java.util.Properties;
 
public class CustomConsumer {
 
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("first"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

 

2.手動提交offset

雖然自動提交offset十分簡介便利,但由于其是基于時間提交的,開發人員難以把握offset提交的時機。因此Kafka還提供了手動提交offset的API。
手動提交offset的方法有兩種:分別是commitSync(同步提交)和commitAsync(異步提交)。兩者的相同點是,都會將本次poll的一批數據最高的偏移量提交;不同點是,commitSync阻塞當前線程,一直到提交成功,并且會自動失敗重試(由不可控因素導致,也會出現提交失敗);而commitAsync則沒有失敗重試機制,故有可能提交失敗。

1)同步提交offset
由于同步提交offset有失敗重試機制,故更加可靠,以下為同步提交offset的示例。

package com.heima.kafka.consumer;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
import java.util.Arrays;
import java.util.Properties;
 
/**
 * @author liubo
 */
public class CustomComsumer {
 
    public static void main(String[] args) {
 
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");//Kafka集群
        props.put("group.id", "test");//消費者組,只要group.id相同,就屬于同一個消費者組
        props.put("enable.auto.commit", "false");//關閉自動提交offset
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("first"));//消費者訂閱主題
 
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);//消費者拉取數據
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            consumer.commitSync();//同步提交,當前線程會阻塞知道offset提交成功
        }
    }
}


2)異步提交offset
雖然同步提交offset更可靠一些,但是由于其會阻塞當前線程,直到提交成功。因此吞吐量會收到很大的影響。因此更多的情況下,會選用異步提交offset的方式。
以下為異步提交offset的示例:

package com.heima.kafka.consumer;
 
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
 
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
 
/**
 * @author liubo
 */
public class CustomConsumer {
 
    public static void main(String[] args) {
 
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");//Kafka集群
        props.put("group.id", "test");//消費者組,只要group.id相同,就屬于同一個消費者組
        props.put("enable.auto.commit", "false");//關閉自動提交offset
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("first"));//消費者訂閱主題
 
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);//消費者拉取數據
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if (exception != null) {
                        System.err.println("Commit failed for" + offsets);
                    }
                }
            });//異步提交
        }
    }
}

 
Kafka的常用API


3。自定義存儲offset

Kafka 0.9版本之前,offset存儲在zookeeper,0.9版本之后,默認將offset存儲在Kafka的一個內置的topic中。除此之外,Kafka還可以選擇自定義存儲offset。
Offset的維護是相當繁瑣的,因為需要考慮到消費者的Rebalace。

當有新的消費者加入消費者組、已有的消費者推出消費者組或者所訂閱的主題的分區發生變化,就會觸發到分區的重新分配,重新分配的過程叫做Rebalance。
消費者發生Rebalance之后,每個消費者消費的分區就會發生變化。因此消費者要首先獲取到自己被重新分配到的分區,并且定位到每個分區最近提交的offset位置繼續消費。
要實現自定義存儲offset,需要借助ConsumerRebalanceListener,以下為示例代碼,其中提交和獲取offset的方法,需要根據所選的offset存儲系統自行實現。

package com.heima.kafka.consumer;
 
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
 
import java.util.*;
 
/**
 * @author liubo
 */
public class CustomConsumer {
 
    private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
 
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");//Kafka集群
        props.put("group.id", "test");//消費者組,只要group.id相同,就屬于同一個消費者組
        props.put("enable.auto.commit", "false");//關閉自動提交offset
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {
           
            //該方法會在Rebalance之前調用
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                commitOffset(currentOffset);
            }
 
            //該方法會在Rebalance之后調用
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                currentOffset.clear();
                for (TopicPartition partition : partitions) {
                    consumer.seek(partition, getOffset(partition));//定位到最近提交的offset位置繼續消費
                }
            }
        });
 
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);//消費者拉取數據
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
            }
            commitOffset(currentOffset);
        }
    }
 
    //獲取某分區的最新offset
    private static long getOffset(TopicPartition partition) {
        return 0;
    }
 
    //提交該消費者所有分區的offset
    private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
 
    }
}

 


javaee

python

web

ui

cloud

test

c

netmarket

pm

Linux

movies

robot

uids

北京校區

    14天免費試學

    基礎班入門課程限時免費

    申請試學名額

    15天免費試學

    基礎班入門課程限時免費

    申請試學名額

    15天免費試學

    基礎班入門課程限時免費

    申請試學名額

    15天免費試學

    基礎班入門課程限時免費

    申請試學名額

    20天免費試學

    基礎班入門課程限時免費

    申請試學名額

    8天免費試學

    基礎班入門課程限時免費

    申請試學名額

    20天免費試學

    基礎班入門課程限時免費

    申請試學名額

    5天免費試學

    基礎班入門課程限時免費

    申請試學名額

    0天免費試學

    基礎班入門課程限時免費

    申請試學名額

    12天免費試學

    基礎班入門課程限時免費

    申請試學名額

    5天免費試學

    基礎班入門課程限時免費

    申請試學名額

    5天免費試學

    基礎班入門課程限時免費

    申請試學名額

    10天免費試學

    基礎班入門課程限時免費

    申請試學名額
    2019最新网赚工具 千禧彩票是真的吗 2019最新挂机网赚软件 2019免费挂机网赚 网赚彩票群可靠吗 陕西11选5走势图 2019真实网赚 中文点击网赚新手入门 任我赢机器人 19年网赚钱商机