簡介
Flink-kafka-connector用來做什么?
Kafka中的partition機(jī)制和Flink的并行度機(jī)制結(jié)合,實(shí)現(xiàn)數(shù)據(jù)恢復(fù)
Kafka可以作為Flink的source和sink
任務(wù)失敗,通過設(shè)置kafka的offset來恢復(fù)應(yīng)用
kafka簡單介紹
關(guān)于kafka,我們會(huì)有專題文章介紹,這里簡單介紹幾個(gè)必須知道的概念。
1.生產(chǎn)者(Producer)
顧名思義,生產(chǎn)者就是生產(chǎn)消息的組件,它的主要工作就是源源不斷地生產(chǎn)出消息,然后發(fā)送給消息隊(duì)列。生產(chǎn)者可以向消息隊(duì)列發(fā)送各種類型的消息,如狹義的字符串消息,也可以發(fā)送二進(jìn)制消息。生產(chǎn)者是消息隊(duì)列的數(shù)據(jù)源,只有通過生產(chǎn)者持續(xù)不斷地向消息隊(duì)列發(fā)送消息,消息隊(duì)列才能不斷處理消息。
2.消費(fèi)者(Consumer)
所謂消費(fèi)者,指的是不斷消費(fèi)(獲取)消息的組件,它獲取消息的來源就是消息隊(duì)列(即Kafka本身)。換句話說,生產(chǎn)者不斷向消息隊(duì)列發(fā)送消息,而消費(fèi)者則不斷從消息隊(duì)列中獲取消息。
3.主題(Topic)
主題是Kafka中一個(gè)極為重要的概念。首先,主題是一個(gè)邏輯上的概念,它用于從邏輯上來歸類與存儲(chǔ)消息本身。多個(gè)生產(chǎn)者可以向一個(gè)Topic發(fā)送消息,同時(shí)也可以有多個(gè)消費(fèi)者消費(fèi)一個(gè)Topic中的消息。Topic還有分區(qū)和副本的概念。Topic與消息這兩個(gè)概念之間密切相關(guān),Kafka中的每一條消息都?xì)w屬于某一個(gè)Topic,而一個(gè)Topic下面可以有任意數(shù)量的消息。
kafka簡單操作
啟動(dòng)zk:nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
啟動(dòng)server: nohup bin/kafka-server-start.sh config/server.properties &
創(chuàng)建一個(gè)topic:bin/kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic test
查看topic:bin/kafka-topics.sh --list --zookeeper localhost:2181
發(fā)送數(shù)據(jù):bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
啟動(dòng)一個(gè)消費(fèi)者:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
--topic test --from-beginning
刪除topic: bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topn
Flink消費(fèi)Kafka注意事項(xiàng)
*
setStartFromGroupOffsets()【默認(rèn)消費(fèi)策略】
默認(rèn)讀取上次保存的offset信息 如果是應(yīng)用第一次啟動(dòng),讀取不到上次的offset信息,則會(huì)根據(jù)這個(gè)參數(shù)auto.offset.reset的值來進(jìn)行消費(fèi)數(shù)據(jù)
* setStartFromEarliest()
從最早的數(shù)據(jù)開始進(jìn)行消費(fèi),忽略存儲(chǔ)的offset信息
* setStartFromLatest()
從最新的數(shù)據(jù)進(jìn)行消費(fèi),忽略存儲(chǔ)的offset信息
* setStartFromSpecificOffsets(Map)
從指定位置進(jìn)行消費(fèi)
*
當(dāng)checkpoint機(jī)制開啟的時(shí)候,KafkaConsumer會(huì)定期把kafka的offset信息還有其他operator的狀態(tài)信息一塊保存起來。當(dāng)job失敗重啟的時(shí)候,F(xiàn)link會(huì)從最近一次的checkpoint中進(jìn)行恢復(fù)數(shù)據(jù),重新消費(fèi)kafka中的數(shù)據(jù)。
* 為了能夠使用支持容錯(cuò)的kafka Consumer,需要開啟checkpoint
env.enableCheckpointing(5000); // 每5s checkpoint一次
搭建Kafka單機(jī)環(huán)境
我本地安裝了一個(gè)kafka_2.11-2.1.0版本的kafka
啟動(dòng)Zookeeper和kafka server:
啟動(dòng)zk:nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
啟動(dòng)server: nohup bin/kafka-server-start.sh config/server.properties &
創(chuàng)建一個(gè)topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic test
實(shí)戰(zhàn)案例
Kafka作為Flink Sink
首先pom依賴:
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId> <version>1.7.0</version>
</dependency>
向kafka寫入數(shù)據(jù):
public class KafkaProducer { public static void main(String[] args) throws
Exception{ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String>
text = env.addSource(new MyNoParalleSource()).setParallelism(1); Properties
properties = new Properties(); properties.setProperty("bootstrap.servers",
"localhost:9092"); //new FlinkKafkaProducer("topn",new
KeyedSerializationSchemaWrapper(new
SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer("test",new
SimpleStringSchema(),properties); /* //event-timestamp事件的發(fā)生時(shí)間
producer.setWriteTimestampToKafka(true); */ text.addSink(producer);
env.execute(); } }//
大家這里特別注意,我們實(shí)現(xiàn)了一個(gè)并行度為1的MyNoParalleSource來生產(chǎn)數(shù)據(jù),代碼如下:
//使用并行度為1的source public class MyNoParalleSource implements
SourceFunction<String> {//1 //private long count = 1L; private boolean
isRunning = true; /** * 主要的方法 * 啟動(dòng)一個(gè)source *
大部分情況下,都需要在這個(gè)run方法中實(shí)現(xiàn)一個(gè)循環(huán),這樣就可以循環(huán)產(chǎn)生數(shù)據(jù)了 * * @param ctx * @throws Exception */
@Override public void run(SourceContext<String> ctx) throws Exception {
while(isRunning){ //圖書的排行榜 List<String> books = new ArrayList<>();
books.add("Pyhton從入門到放棄");//10 books.add("Java從入門到放棄");//8
books.add("Php從入門到放棄");//5 books.add("C++從入門到放棄");//3
books.add("Scala從入門到放棄");//0-4 int i = new Random().nextInt(5);
ctx.collect(books.get(i)); //每2秒產(chǎn)生一條數(shù)據(jù) Thread.sleep(2000); } }
//取消一個(gè)cancel的時(shí)候會(huì)調(diào)用的方法 @Override public void cancel() { isRunning = false; } }
代碼實(shí)現(xiàn)了一個(gè)發(fā)送器,來發(fā)送書名等...
然后右鍵運(yùn)行我們的程序,控制臺(tái)輸出如下:
開始源源不斷的生產(chǎn)數(shù)據(jù)了。
然后我們用命令去查看一下 kafka test這個(gè)topic:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
--from-beginning
輸出如下:
Kafka作為Flink Source
直接上代碼:
public class KafkaConsumer { public static void main(String[] args) throws
Exception{ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties =
new Properties(); properties.setProperty("bootstrap.servers",
"localhost:9092"); FlinkKafkaConsumer<String> consumer = new
FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); //從最早開始消費(fèi)
consumer.setStartFromEarliest(); DataStream<String> stream = env
.addSource(consumer); stream.print(); //stream.map(); env.execute(); } }//
控制臺(tái)輸出如下:
將我們之前發(fā)往kafka的消息全部打印出來了。
熱門工具 換一換