簡介

          Flink-kafka-connector用來做什么?

          Kafka中的partition機(jī)制和Flink的并行度機(jī)制結(jié)合,實(shí)現(xiàn)數(shù)據(jù)恢復(fù)
          Kafka可以作為Flink的source和sink
          任務(wù)失敗,通過設(shè)置kafka的offset來恢復(fù)應(yīng)用

          kafka簡單介紹

          關(guān)于kafka,我們會(huì)有專題文章介紹,這里簡單介紹幾個(gè)必須知道的概念。

          1.生產(chǎn)者(Producer)

          顧名思義,生產(chǎn)者就是生產(chǎn)消息的組件,它的主要工作就是源源不斷地生產(chǎn)出消息,然后發(fā)送給消息隊(duì)列。生產(chǎn)者可以向消息隊(duì)列發(fā)送各種類型的消息,如狹義的字符串消息,也可以發(fā)送二進(jìn)制消息。生產(chǎn)者是消息隊(duì)列的數(shù)據(jù)源,只有通過生產(chǎn)者持續(xù)不斷地向消息隊(duì)列發(fā)送消息,消息隊(duì)列才能不斷處理消息。
          2.消費(fèi)者(Consumer)

          所謂消費(fèi)者,指的是不斷消費(fèi)(獲取)消息的組件,它獲取消息的來源就是消息隊(duì)列(即Kafka本身)。換句話說,生產(chǎn)者不斷向消息隊(duì)列發(fā)送消息,而消費(fèi)者則不斷從消息隊(duì)列中獲取消息。
          3.主題(Topic)

          主題是Kafka中一個(gè)極為重要的概念。首先,主題是一個(gè)邏輯上的概念,它用于從邏輯上來歸類與存儲(chǔ)消息本身。多個(gè)生產(chǎn)者可以向一個(gè)Topic發(fā)送消息,同時(shí)也可以有多個(gè)消費(fèi)者消費(fèi)一個(gè)Topic中的消息。Topic還有分區(qū)和副本的概念。Topic與消息這兩個(gè)概念之間密切相關(guān),Kafka中的每一條消息都?xì)w屬于某一個(gè)Topic,而一個(gè)Topic下面可以有任意數(shù)量的消息。
          kafka簡單操作

          啟動(dòng)zk:nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

          啟動(dòng)server: nohup bin/kafka-server-start.sh config/server.properties &

          創(chuàng)建一個(gè)topic:bin/kafka-topics.sh --create --zookeeper localhost:2181
          --replication-factor 1 --partitions 1 --topic test

          查看topic:bin/kafka-topics.sh --list --zookeeper localhost:2181

          發(fā)送數(shù)據(jù):bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

          啟動(dòng)一個(gè)消費(fèi)者:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
          --topic test --from-beginning

          刪除topic: bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topn

          Flink消費(fèi)Kafka注意事項(xiàng)

          *
          setStartFromGroupOffsets()【默認(rèn)消費(fèi)策略】
          默認(rèn)讀取上次保存的offset信息 如果是應(yīng)用第一次啟動(dòng),讀取不到上次的offset信息,則會(huì)根據(jù)這個(gè)參數(shù)auto.offset.reset的值來進(jìn)行消費(fèi)數(shù)據(jù)
          * setStartFromEarliest()
          從最早的數(shù)據(jù)開始進(jìn)行消費(fèi),忽略存儲(chǔ)的offset信息
          * setStartFromLatest()
          從最新的數(shù)據(jù)進(jìn)行消費(fèi),忽略存儲(chǔ)的offset信息
          * setStartFromSpecificOffsets(Map)
          從指定位置進(jìn)行消費(fèi)
          *
          當(dāng)checkpoint機(jī)制開啟的時(shí)候,KafkaConsumer會(huì)定期把kafka的offset信息還有其他operator的狀態(tài)信息一塊保存起來。當(dāng)job失敗重啟的時(shí)候,F(xiàn)link會(huì)從最近一次的checkpoint中進(jìn)行恢復(fù)數(shù)據(jù),重新消費(fèi)kafka中的數(shù)據(jù)。
          * 為了能夠使用支持容錯(cuò)的kafka Consumer,需要開啟checkpoint
          env.enableCheckpointing(5000); // 每5s checkpoint一次
          搭建Kafka單機(jī)環(huán)境

          我本地安裝了一個(gè)kafka_2.11-2.1.0版本的kafka



          啟動(dòng)Zookeeper和kafka server:
          啟動(dòng)zk:nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
          啟動(dòng)server: nohup bin/kafka-server-start.sh config/server.properties &
          創(chuàng)建一個(gè)topic:
          bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
          --partitions 1 --topic test


          實(shí)戰(zhàn)案例

          Kafka作為Flink Sink

          首先pom依賴:
          <dependency> <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.7.0</version>
          </dependency>
          向kafka寫入數(shù)據(jù):
          public class KafkaProducer { public static void main(String[] args) throws
          Exception{ StreamExecutionEnvironment env =
          StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String>
          text = env.addSource(new MyNoParalleSource()).setParallelism(1); Properties
          properties = new Properties(); properties.setProperty("bootstrap.servers",
          "localhost:9092"); //new FlinkKafkaProducer("topn",new
          KeyedSerializationSchemaWrapper(new
          SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
          FlinkKafkaProducer<String> producer = new FlinkKafkaProducer("test",new
          SimpleStringSchema(),properties); /* //event-timestamp事件的發(fā)生時(shí)間
          producer.setWriteTimestampToKafka(true); */ text.addSink(producer);
          env.execute(); } }//
          大家這里特別注意,我們實(shí)現(xiàn)了一個(gè)并行度為1的MyNoParalleSource來生產(chǎn)數(shù)據(jù),代碼如下:
          //使用并行度為1的source public class MyNoParalleSource implements
          SourceFunction<String> {//1 //private long count = 1L; private boolean
          isRunning = true; /** * 主要的方法 * 啟動(dòng)一個(gè)source *
          大部分情況下,都需要在這個(gè)run方法中實(shí)現(xiàn)一個(gè)循環(huán),這樣就可以循環(huán)產(chǎn)生數(shù)據(jù)了 * * @param ctx * @throws Exception */
          @Override public void run(SourceContext<String> ctx) throws Exception {
          while(isRunning){ //圖書的排行榜 List<String> books = new ArrayList<>();
          books.add("Pyhton從入門到放棄");//10 books.add("Java從入門到放棄");//8
          books.add("Php從入門到放棄");//5 books.add("C++從入門到放棄");//3
          books.add("Scala從入門到放棄");//0-4 int i = new Random().nextInt(5);
          ctx.collect(books.get(i)); //每2秒產(chǎn)生一條數(shù)據(jù) Thread.sleep(2000); } }
          //取消一個(gè)cancel的時(shí)候會(huì)調(diào)用的方法 @Override public void cancel() { isRunning = false; } }
          代碼實(shí)現(xiàn)了一個(gè)發(fā)送器,來發(fā)送書名等...

          然后右鍵運(yùn)行我們的程序,控制臺(tái)輸出如下:



          開始源源不斷的生產(chǎn)數(shù)據(jù)了。

          然后我們用命令去查看一下 kafka test這個(gè)topic:
          bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
          --from-beginning
          輸出如下:



          Kafka作為Flink Source

          直接上代碼:
          public class KafkaConsumer { public static void main(String[] args) throws
          Exception{ StreamExecutionEnvironment env =
          StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties =
          new Properties(); properties.setProperty("bootstrap.servers",
          "localhost:9092"); FlinkKafkaConsumer<String> consumer = new
          FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); //從最早開始消費(fèi)
          consumer.setStartFromEarliest(); DataStream<String> stream = env
          .addSource(consumer); stream.print(); //stream.map(); env.execute(); } }//
          控制臺(tái)輸出如下:


          將我們之前發(fā)往kafka的消息全部打印出來了。

          友情鏈接
          ioDraw流程圖
          API參考文檔
          OK工具箱
          云服務(wù)器優(yōu)惠
          阿里云優(yōu)惠券
          騰訊云優(yōu)惠券
          京東云優(yōu)惠券
          站點(diǎn)信息
          問題反饋
          郵箱:[email protected]
          QQ群:637538335
          關(guān)注微信

                欧美日韩诱惑 | 女人av网 | 影音先锋成人中文在线视频 | 翔田千里无码破解版 | 成人免费ZN码婬片在线观看免费 |