人人IT網

人人IT網

當前位置: 主頁 > JAVA編程 > JAVA >

大數據架構:flume-ng+Kafka+Storm+HDFS 實時系統組合

時間:2016-12-03 01:03來源:Internet 作者:Internet 點擊:
個人觀點:大數據我們都知道hadoop,但並不都是hadoop.我們該如何構建大數據庫項目。對於離線處理,hadoop還是比較适合的,但是對於實時性比較強的,數據量比較大的,我們可以采用Storm,那

個人觀點:大數據我們都知道hadoop,但並不都是hadoop.我們該如何構建大數據庫項目。對於離線處理,hadoop還是比較适合的,但是對於實時性比較強的,數據量比較大的,我們可以采用Storm,那麼Storm和什麼技術搭配,才能夠做一個适合自己的項目。下面给大家可以参考。
可以帶着下面問題來閱讀本文章:
1.一個好的項目架構應該具備什麼特點?
2.本項目架構是如何保證數據准確性的?
3.什麼是Kafka?
4.flume+kafka如何整合?
5.使用什麼腳本可以查看flume有沒有往Kafka傳輸數據


做軟件開發的都知道模塊化思想,這样設計的原因有兩方面:
一方面是可以模塊化,功能劃分更加清晰,從“數據采集--數據接入--流失計算--數據輸出/存儲”
<iframe id="iframe_0.9083128411768815" style="margin: 0px; padding: 0px; border-width: initial; border-style: none; width: 618px; height: 85px;" src="data:text/html;charset=utf8,%3Cimg%20id=%22img%22%20src=%22http://www.aboutyun.com/data/attachment/forum/201402/10/150104c96b5bknkbwbkhb5.jpg?_=5946251%22%20style=%22border:none;max-width:686px%22%3E%3Cscript%3Ewindow.onload%20=%20function%20()%20%7Bvar%20img%20=%20document.getElementById('img');%20window.parent.postMessage(%7BiframeId:'iframe_0.9083128411768815',width:img.width,height:img.height%7D,%20'http://www.cnblogs.com');%7D%3C/script%3E" frameborder="0" scrolling="no"></iframe> 

1).數據采集
負責從各節點上實時采集數據,選用cloudera的flume來實現
2).數據接入
由於采集數據的速度和數據處理的速度不一定同步,因此添加一個消息中間件來作为緩沖,選用apache的kafka
3).流式計算
對采集到的數據進行實時分析,選用apache的storm
4).數據輸出
對分析後的結果持久化,暫定用mysql
另一方面是模塊化之後,假如當Storm掛掉了之後,數據采集和數據接入還是繼續在跑着,數據不會丟失,storm起來之後可以繼續進行流式計算;

 

那麼接下來我們來看下整體的架構圖
<iframe id="iframe_0.2135765924440165" style="margin: 0px; padding: 0px; border-width: initial; border-style: none; width: 366px; height: 355px;" src="data:text/html;charset=utf8,%3Cimg%20id=%22img%22%20src=%22http://www.aboutyun.com/data/attachment/forum/201402/10/150105etbwmjcaoexcbta7.jpg?_=5946251%22%20style=%22border:none;max-width:686px%22%3E%3Cscript%3Ewindow.onload%20=%20function%20()%20%7Bvar%20img%20=%20document.getElementById('img');%20window.parent.postMessage(%7BiframeId:'iframe_0.2135765924440165',width:img.width,height:img.height%7D,%20'http://www.cnblogs.com');%7D%3C/script%3E" frameborder="0" scrolling="no"></iframe> 

 

詳細介紹各個組件及安裝配置:
操作系統:ubuntu

 

Flume
Flume是Cloudera提供的一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的日志收集系統,支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(可定制)的能力。
下圖为flume典型的體系結構:
Flume數據源以及輸出方式:
Flume提供了從console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系統,支持TCP和UDP等2種模式),exec(命令執行)等數據源上收集數據的能力,在我們的系統中目前使用exec方式進行日志采集。

Flume的數據接受方,可以是console(控制台)、text(文件)、dfs(HDFS文件)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日志系統)等。在我們系統中由kafka來接收。

Flume下載及文檔:
http://flume.apache.org/
Flume安裝:
  1. $tar zxvf apache-flume-1.4.0-bin.tar.gz/usr/local
复制代碼
Flume启動命令:
  1. $bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console
复制代碼
Kafka

 

kafka是一種高吞吐量的分布式發布訂閱消息系統,她有如下特性:
  • 通過O(1)的磁盤數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。
  • 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒數十萬的消息。
  • 支持通過kafka服務器和消費機集群來分區消息。
  • 支持Hadoop並行數據加載。
kafka的目的是提供一個發布訂閱解决方案,它可以處理消費者規模的網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網络上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日志和日志聚合來解决。 對於像Hadoop的一样的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解决方案。kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是为了通過集群機來提供實時的消費。
kafka分布式訂閱架構如下圖:--取自Kafka官網
<iframe id="iframe_0.015943506188883294" style="margin: 0px; padding: 0px; border-width: initial; border-style: none; width: 258px; height: 180px;" src="data:text/html;charset=utf8,%3Cimg%20id=%22img%22%20src=%22http://www.aboutyun.com/data/attachment/forum/201402/10/150105je8xweaxjsassesa.png?_=5946251%22%20style=%22border:none;max-width:686px%22%3E%3Cscript%3Ewindow.onload%20=%20function%20()%20%7Bvar%20img%20=%20document.getElementById('img');%20window.parent.postMessage(%7BiframeId:'iframe_0.015943506188883294',width:img.width,height:img.height%7D,%20'http://www.cnblogs.com');%7D%3C/script%3E" frameborder="0" scrolling="no"></iframe>
罗寶兄弟文章上的架構圖是這样的
<iframe id="iframe_0.8381029327494822" style="margin: 0px; padding: 0px; border-width: initial; border-style: none; width: 562px; height: 312px;" src="data:text/html;charset=utf8,%3Cimg%20id=%22img%22%20src=%22http://www.aboutyun.com/data/attachment/forum/201402/10/150106wfh8u7d4hbmsnh7u.jpg?_=5946251%22%20style=%22border:none;max-width:686px%22%3E%3Cscript%3Ewindow.onload%20=%20function%20()%20%7Bvar%20img%20=%20document.getElementById('img');%20window.parent.postMessage(%7BiframeId:'iframe_0.8381029327494822',width:img.width,height:img.height%7D,%20'http://www.cnblogs.com');%7D%3C/script%3E" frameborder="0" scrolling="no"></iframe>
其實兩者沒有太大區別,官網的架構圖只是把Kafka簡潔的表示成一個Kafka Cluster,而上面架構圖就相對詳細一些;

 

Kafka版本:0.8.0
Kafka安裝:
  1. > tar xzf kafka-<VERSION>.tgz
  2. > cd kafka-<VERSION>
  3. > ./sbt update
  4. > ./sbt package
  5. > ./sbt assembly-package-dependency
复制代碼
启動及測試命令:
(1) start server

 

  1. > bin/zookeeper-server-start.shconfig/zookeeper.properties
  2. > bin/kafka-server-start.shconfig/server.properties
复制代碼
這裏是官網上的教程,kafka本身有內置zookeeper,但是我自己在實際部署中是使用單獨的zookeeper集群,所以第一行命令我就沒執行,這裏只是些出來给大家看下。

 

配置獨立的zookeeper集群需要配置server.properties文件,講zookeeper.connect修改为獨立集群的IP和端口

 

  1. zookeeper.connect=nutch1:2181
复制代碼
(2)Create a topic

 

  1. > bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
  2. > bin/kafka-list-topic.sh --zookeeperlocalhost:2181
复制代碼
(3)Send some messages

 

  1. > bin/kafka-console-producer.sh--broker-list localhost:9092 --topic test
复制代碼
(4)Start a consumer

 

  1. > bin/kafka-console-consumer.sh--zookeeper localhost:2181 --topic test --from-beginning
复制代碼
kafka-console-producer.sh和kafka-console-cousumer.sh只是系統提供的命令行工具。這裏启動是为了測試是否能正常生產消費;驗證流程正確性
在實際開發中還是要自行開發自己的生產者與消費者;
kafka的安裝也可以参考我之前寫的文章:http://go.rritw.com/blog.csdn.net/weijonathan/article/details/18075967
Storm
Twitter將Storm正式開源了,這是一個分布式的、容錯的實時計算系統,它被托管在GitHub上,遵循  Eclipse Public License 1.0。Storm是由BackType開發的實時處理系統,BackType現在已在Twitter麾下。GitHub上的最新版本是Storm 0.5.2,基本是用Clojure寫的。
<iframe id="iframe_0.5267034409118465" style="margin: 0px; padding: 0px; border-width: initial; border-style: none; width: 643px; height: 453px;" src="data:text/html;charset=utf8,%3Cimg%20id=%22img%22%20src=%22http://www.aboutyun.com/data/attachment/forum/201402/10/150107qor24znw2nrbgzru.jpg?_=5946251%22%20style=%22border:none;max-width:686px%22%3E%3Cscript%3Ewindow.onload%20=%20function%20()%20%7Bvar%20img%20=%20document.getElementById('img');%20window.parent.postMessage(%7BiframeId:'iframe_0.5267034409118465',width:img.width,height:img.height%7D,%20'http://www.cnblogs.com');%7D%3C/script%3E" frameborder="0" scrolling="no"></iframe> 

 

Storm的主要特點如下:
  • 簡單的編程模型。類似於MapReduce降低了並行批處理复雜性,Storm降低了進行實時處理的复雜性。
  • 可以使用各種編程語言。你可以在Storm之上使用各種編程語言。默認支持Clojure、Java、Ruby和Python。要增加對其他語言的支持,只需實現一個簡單的Storm通信協議即可。
  • 容錯性。Storm會管理工作進程和節點的故障。
  • 水平擴展。計算是在多個線程、進程和服務器之間並行進行的。
  • 可靠的消息處理。Storm保證每個消息至少能得到一次完整處理。任務失敗時,它會負責從消息源重試消息。
  • 快速。系統的設計保證了消息能得到快速的處理,使用ØMQ作为其底層消息隊列。(0.9.0.1版本支持ØMQ和netty兩種模式)
  • 本地模式。Storm有一個“本地模式”,可以在處理過程中完全模擬Storm集群。這讓你可以快速進行開發和單元測試。
由於篇幅問題,具體的安裝步驟可以参考:Storm-0.9.0.1安裝部署 指導
接下來重頭戲開始拉!那就是框架之間的整合啦

 

flume和kafka整合
2.提取插件中的flume-conf.properties文件
修改該文件:#source section
producer.sources.s.type = exec
producer.sources.s.command = tail -f -n+1 /mnt/hgfs/vmshare/test.log
producer.sources.s.channels = c
修改所有topic的值改为test
將改後的配置文件放進flume/conf目錄下
在該項目中提取以下jar包放入環境中flume的lib下:
完成上面的步驟之後,我們來測試下flume+kafka這個流程有沒有走通;
我們先启動flume,然後再启動kafka,启動步驟按之前的步驟執行;接下來我們使用kafka的kafka-console-consumer.sh腳本查看是否有flume有沒有往Kafka傳輸數據;
<iframe id="iframe_0.9849005599399405" style="margin: 0px; padding: 0px; border-width: initial; border-style: none; width: 686px; height: 433px;" src="data:text/html;charset=utf8,%3Cimg%20id=%22img%22%20src=%22http://www.aboutyun.com/data/attachment/forum/201402/10/150107c7yl817orzd780fa.jpg?_=5946251%22%20style=%22border:none;max-width:686px%22%3E%3Cscript%3Ewindow.onload%20=%20function%20()%20%7Bvar%20img%20=%20document.getElementById('img');%20window.parent.postMessage(%7BiframeId:'iframe_0.9849005599399405',width:img.width,height:img.height%7D,%20'http://www.cnblogs.com');%7D%3C/script%3E" frameborder="0" scrolling="no"></iframe> 
以上這個是我的test.log文件通過flume抓取傳到kafka的數據;說明我們的flume和kafka流程走通了;
大家還記得剛開始我們的流程圖麼,其中有一步是通過flume到kafka,還有一步是到hdfs的;而我們這邊還沒有提到如何存入kafka且同時存如hdfs;
flume是支持數據同步复制,同步复制流程圖如下,取自於flume官網,官網用戶指南地址:http://go.rritw.com/flume.apache.org/FlumeUserGuide.html
<iframe id="iframe_0.64731552137773" style="margin: 0px; padding: 0px; border-width: initial; border-style: none; width: 686px; height: 388px;" src="data:text/html;charset=utf8,%3Cimg%20id=%22img%22%20src=%22http://www.aboutyun.com/data/attachment/forum/201402/10/150108zso6yllyrzwc6c75.jpg?_=5946251%22%20style=%22border:none;max-width:686px%22%3E%3Cscript%3Ewindow.onload%20=%20function%20()%20%7Bvar%20img%20=%20document.getElementById('img');%20window.parent.postMessage(%7BiframeId:'iframe_0.64731552137773',width:img.width,height:img.height%7D,%20'http://www.cnblogs.com');%7D%3C/script%3E" frameborder="0" scrolling="no"></iframe> 
怎麼設置同步复制呢,看下面的配置:

 

  1. #2個channel和2個sink的配置文件  這裏我們可以設置兩個sink,一個是kafka的,一個是hdfs的;
  2. a1.sources = r1
  3. a1.sinks = k1 k2
  4. a1.channels = c1 c2
复制代碼
具體配置大夥根據自己的需求去設置,這裏就不具體舉例了

 

kafka和storm的整合

 

2.使用maven package進行編譯,得到storm-kafka-0.8-plus-0.3.0-SNAPSHOT.jar包   --有轉載的童鞋注意下,這裏的包名之前寫錯了,現在改正確了!不好意思!
3.將該jar包及kafka_2.9.2-0.8.0-beta1.jar、metrics-core-2.2.0.jar、scala-library-2.9.2.jar (這三個jar包在kafka項目中能找到)
備注:如果開發的項目需要其他jar,記得也要放進storm的Lib中比如用到了mysql就要添加mysql-connector-java-5.1.22-bin.jar到storm的lib下
那麼接下來我們把storm也重启下;
完成以上步驟之後,我們還有一件事情要做,就是使用kafka-storm0.8插件,寫一個自己的Storm程序;
這裏我给大夥附上一個我弄的storm程序,百度網盤分享地址:http://pan.baidu.com/s/1hqFVjBQ
先稍微看下程序的創建Topology代碼
<iframe id="iframe_0.6791582110609324" style="margin: 0px; padding: 0px; border-width: initial; border-style: none; width: 686px; height: 287px;" src="data:text/html;charset=utf8,%3Cimg%20id=%22img%22%20src=%22http://www.aboutyun.com/data/attachment/forum/201402/10/150109bwya0kwkjgkdeybu.jpg?_=5946251%22%20style=%22border:none;max-width:686px%22%3E%3Cscript%3Ewindow.onload%20=%20function%20()%20%7Bvar%20img%20=%20document.getElementById('img');%20window.parent.postMessage(%7BiframeId:'iframe_0.6791582110609324',width:img.width,height:img.height%7D,%20'http://www.cnblogs.com');%7D%3C/script%3E" frameborder="0" scrolling="no"></iframe> 
數據操作主要在WordCounter類中,這裏只是使用簡單JDBC進行插入處理
<iframe id="iframe_0.31630804945808677" style="margin: 0px; padding: 0px; border-width: initial; border-style: none; width: 686px; height: 569px;" src="data:text/html;charset=utf8,%3Cimg%20id=%22img%22%20src=%22http://www.aboutyun.com/data/attachment/forum/201402/10/150111tsnn0nn469sn6mis.jpg?_=5946251%22%20style=%22border:none;max-width:686px%22%3E%3Cscript%3Ewindow.onload%20=%20function%20()%20%7Bvar%20img%20=%20document.getElementById('img');%20window.parent.postMessage(%7BiframeId:'iframe_0.31630804945808677',width:img.width,height:img.height%7D,%20'http://www.cnblogs.com');%7D%3C/script%3E" frameborder="0" scrolling="no"></iframe> 
這裏只需要輸入一個参數作为Topology名稱就可以了!我們這裏使用本地模式,所以不輸入参數,直接看流程是否走通;

 

  1. storm-0.9.0.1/bin/storm jar storm-start-demo-0.0.1-SNAPSHOT.jar com.storm.topology.MyTopology
复制代碼
先看下日志,這裏打印出來了往數據庫裏面插入數據了
<iframe id="iframe_0.8443197785437921" style="margin: 0px; padding: 0px; border-width: initial; border-style: none; width: 686px; height: 370px;" src="data:text/html;charset=utf8,%3Cimg%20id=%22img%22%20src=%22http://www.aboutyun.com/data/attachment/forum/201402/10/150112vhav4cb0nymizezf.jpg?_=5946251%22%20style=%22border:none;max-width:686px%22%3E%3Cscript%3Ewindow.onload%20=%20function%20()%20%7Bvar%20img%20=%20document.getElementById('img');%20window.parent.postMessage(%7BiframeId:'iframe_0.8443197785437921',width:img.width,height:img.height%7D,%20'http://www.cnblogs.com');%7D%3C/script%3E" frameborder="0" scrolling="no"></iframe> 
然後我們查看下數據庫;插入成功了!
<iframe id="iframe_0.02512746802581356" style="margin: 0px; padding: 0px; border-width: initial; border-style: none; width: 196px; height: 659px;" src="data:text/html;charset=utf8,%3Cimg%20id=%22img%22%20src=%22http://www.aboutyun.com/data/attachment/forum/201402/10/150113v5p5cnb565nb6nb5.jpg?_=5946251%22%20style=%22border:none;max-width:686px%22%3E%3Cscript%3Ewindow.onload%20=%20function%20()%20%7Bvar%20img%20=%20document.getElementById('img');%20window.parent.postMessage(%7BiframeId:'iframe_0.02512746802581356',width:img.width,height:img.height%7D,%20'http://www.cnblogs.com');%7D%3C/script%3E" frameborder="0" scrolling="no"></iframe> 
到這裏我們的整個整合就完成了!
但是這裏還有一個問題,不知道大夥有沒有發現。
由於我們使用storm進行分布式流式計算,那麼分布式最需要注意的是數據一致性以及避免脏數據的產生;所以我提供的測試項目只能用於測試,正式開發不能這样處理;
晨色星空J2EE(一個網名)给的建議是建立一個zookeeper的分布式全局锁,保證數據一致性,避免脏數據錄入!
zookeeper客戶端框架大夥可以使用Netflix Curator來完成,由於這塊我還沒去看,所以只能寫到這裏了!
 
http://blog.csdn.net/weijonathan/article/details/18301321
每天進步一點點
http://www.cnblogs.com/onetwo/p/5946251.html 
 

From:ITEYE
頂一下
(0)
0%
踩一下
(0)
0%
------分隔線----------------------------
發表評論
請自覺遵守互聯網相關的政策法規,嚴禁發布色情、暴力、反動的言論。
評價:
表情:
驗證碼:點擊我更換圖片
欄目列表
推薦內容