在大數(shù)據(jù)時代,數(shù)據(jù)規(guī)模變得越來越大。由于數(shù)據(jù)的增長速度和非結(jié)構(gòu)化的特性,常用的軟硬件工具已無法在用戶可容忍的時間內(nèi)對數(shù)據(jù)進(jìn)行采集、管理和處理。本文主要介紹如何在阿里云上使用Kafka和Storm搭建大規(guī)模消息分發(fā)和實時數(shù)據(jù)流處理系統(tǒng),以及這個過程中主要遭遇的一些挑戰(zhàn)。實踐主要立足建立一套汽車狀態(tài)實時監(jiān)控系統(tǒng),可以在阿里云上立即進(jìn)行部署。
一、實時大數(shù)據(jù)處理利器——Storm和Kafka
大數(shù)據(jù)時代,隨著可獲取數(shù)據(jù)的渠道增多,比如常見的電子商務(wù)、網(wǎng)絡(luò)、傳感器的數(shù)據(jù)流、太空數(shù)據(jù)等,數(shù)據(jù)規(guī)模也變得越來越大;同時,不同的渠道往往產(chǎn)生更多的數(shù)據(jù)類型,這些衍生的數(shù)據(jù)增長非常之快,規(guī)模非常之大。大數(shù)據(jù)時代各個機(jī)構(gòu)可謂是坐擁金山,然而目前大數(shù)據(jù)技術(shù)的應(yīng)用卻仍然存在眾多挑戰(zhàn),主要出現(xiàn)在數(shù)據(jù)收集、存儲、處理和可視化幾個過程。
1. 數(shù)據(jù)收集
Gartner的Merv Adrian對大數(shù)據(jù)有這樣一個定義:“大數(shù)據(jù)讓常用硬件軟件工具無法在用戶可容忍時間內(nèi)對數(shù)據(jù)進(jìn)行采集、管理和處理。”[1]麥肯錫全球研究院在2011年5月也有這樣一個概念:“大數(shù)據(jù)是指超出典型數(shù)據(jù)庫軟件工具采集、存儲、管理和分析能力的數(shù)據(jù)集。”[2]從上面的定義可以看出,大數(shù)據(jù)最大的挑戰(zhàn)在于如何在有限時間內(nèi)對數(shù)據(jù)進(jìn)行處理和分析,并得到有用信息。
2. 數(shù)據(jù)處理
大數(shù)據(jù)處理中最著名的工具是Hadoop,不過它并不是一套實時系統(tǒng)。為了解決這個問題,計算機(jī)工程師們又開發(fā)了Storm和Kafka。Apache Storm是一套開源的分布式實時計算系統(tǒng)。最早由Nathan Marz[3]開發(fā),在被Twitter收購后開源,并在2014年9月起成為Apache頂級開源項目。Storm被廣泛用于各種商業(yè)網(wǎng)站,包括Twitter、Yelp、Groupon、百度、淘寶等。Storm的使用場景非常廣泛,例如實時分析、在線機(jī)器學(xué)習(xí)、連續(xù)計算、分部署RPC、ET|等。Storm有著非常快的處理速度,單節(jié)點可以達(dá)到百萬個元組每秒,此外它還具有高擴(kuò)展、容錯、保證數(shù)據(jù)處理等特性。圖1是Storm的一個簡單的架構(gòu)。
圖1 Storm架構(gòu)
Apache Kafka也是一個開源的系統(tǒng),旨在提供一個統(tǒng)一的,高吞吐、低延遲的分布式消息處理平臺來對實時數(shù)據(jù)進(jìn)行處理。它最早由LinkedIn開發(fā),開源于2011年并被貢獻(xiàn)給了Apache。Kafka區(qū)別于傳統(tǒng)RabbitMQ、Apache ActiveMQ等消息系統(tǒng)的地方主要在于:分布式系統(tǒng)特性,易于擴(kuò)展;為發(fā)布和訂閱提供高吞吐量;支持多訂閱,可以自動平衡消費者;可以將消息持久化到磁盤,可以用于批量消費,例如ETL等。
圖2 Kafka架構(gòu)
二、在阿里云上部署Storm和Kafka
我們需要設(shè)計一個實時車輛監(jiān)控系統(tǒng),這個系統(tǒng)要將汽車駕駛過程中實時的位置,速度,轉(zhuǎn)速,油耗以及轉(zhuǎn)速發(fā)送到系統(tǒng)中,從而可以實時計算出車流量和污染物排放量。該系統(tǒng)的目標(biāo)是要能同事支持10萬輛車同時發(fā)送消息,在最高峰能滿足100萬輛車。為了實現(xiàn)如此規(guī)模的消息分發(fā)和吞吐,我們基于Kafka和Storm來設(shè)計實現(xiàn)。同時為了滿足高擴(kuò)展性,我們將Storm和Kafka分別部署到不同的服務(wù)器上,如果需要更多的計算能力,可以隨時通過創(chuàng)建新的服務(wù)器的方式來完成。此外為了滿足高可用性,每臺相同功能的服務(wù)器也需要至少部署2臺,這樣一旦一臺服務(wù)器出現(xiàn)問題,另外一臺服務(wù)器也可以持續(xù)提供服務(wù)。
在實體服務(wù)器上部署Storm和Kafka等系統(tǒng)涉及到大量服務(wù)器集群和軟件的安裝部署,這個過程需要花費大量時間,而
云計算則很好的彌補(bǔ)了這一點——提供各種虛擬服務(wù)器和鏡像功能,加快基礎(chǔ)設(shè)施和軟件的部署過程。
圖3 車聯(lián)網(wǎng)監(jiān)控系統(tǒng)架構(gòu)
我們需要2臺服務(wù)器來構(gòu)建Kafka代理服務(wù)器,在Storm中還需要2臺服務(wù)器來運行Spout和2個Bolt,另外在Redis層則需要2臺服務(wù)器來部署緩存,再加上2臺服務(wù)器作為Web服務(wù)器。服務(wù)器架構(gòu)圖如圖4所示。
圖4 車聯(lián)網(wǎng)監(jiān)控系統(tǒng)架構(gòu)
在部署車聯(lián)網(wǎng)監(jiān)控系統(tǒng)之前,我們首先需要在每臺服務(wù)器上部署相應(yīng)的軟件,包括Git、Libzmq、Java、G++等,用于代碼編譯和相關(guān)軟件安裝。可以使用SSH連接到相應(yīng)的機(jī)器。用戶名密碼則會由阿里云以郵件或者短消息的方式提供。
在車聯(lián)網(wǎng)實時監(jiān)控系統(tǒng)中,我們需要部署4種不同類型的服務(wù)器,分別是網(wǎng)站前臺服務(wù)器、Kafka服務(wù)器、Storm服務(wù)器和緩存服務(wù)器,以滿足上面提到的高擴(kuò)展性的要求。在每一種類型的服務(wù)器部署完成之后,都可以通過阿里云鏡像的功能,創(chuàng)建一個能隨時使用的鏡像,這樣在擴(kuò)展服務(wù)器的時候就不需要重新安裝軟件,直接通過鏡像創(chuàng)建服務(wù)器就可以了。
以下命令需要在所有服務(wù)器上運行以安裝相應(yīng)的軟件:
以下命令安裝在緩存服務(wù)器和Kafka服務(wù)器上:
另外,我們還需要在Storm的服務(wù)器安裝maven和lein用于代碼編譯:
在Kafka服務(wù)器上安裝Kafka:
對于Storm和Kafka的安裝,到這一步已基本完成,接下去需要分別創(chuàng)建鏡像。創(chuàng)建鏡像的方法是先創(chuàng)建阿里云快照,然后通過將快照轉(zhuǎn)換為鏡像的方式完成。具體步驟如下:
在阿里云的管理界面選擇云服務(wù)器,隨后選擇該服務(wù)器的磁盤列表,點擊創(chuàng)建快照。
輸入快照名稱并確認(rèn)。
阿里云會自動為云服務(wù)器的系統(tǒng)盤創(chuàng)建快照,當(dāng)創(chuàng)建完成以后,會出現(xiàn)“創(chuàng)建自定義鏡像”按鈕。
點擊“創(chuàng)建自定義鏡像”的按鈕,阿里云就會將這個快照轉(zhuǎn)換為鏡像,可以在阿里云ECS管理界面的自定義鏡像欄中看到。
接下來,我們通過鏡像可以直接創(chuàng)建相同配置的ECS服務(wù)器。
圖5 從自定義鏡像中創(chuàng)建云服務(wù)器
當(dāng)然,在自動擴(kuò)展實現(xiàn)上,云服務(wù)并不需要用戶去手動執(zhí)行,這里我們使用阿里云的ECS REST API自動通過鏡像創(chuàng)建機(jī)器。可以參考以下Python代碼,自動創(chuàng)建阿里云ECS虛擬機(jī):
三、基于Storm和Kafka的車輛信息實時監(jiān)控系統(tǒng)打造
接下來做的就是將車輛信息實時監(jiān)控系統(tǒng)部署到系統(tǒng)中。這個系統(tǒng)演示了如何編寫一個Storm的Topology,從Kafka消息系統(tǒng)中將信息讀取出來。我們使用Kafka的客戶端模擬從世界各地發(fā)送車輛實時信息給Kafka集群,然后Storm Topology會把這些消息通過Bolts將坐標(biāo)轉(zhuǎn)換為Json對象,并且使用GeoJSON在Bing Map上顯示車輛的實時位置、溫度、轉(zhuǎn)速以及速度等等信息。Topology還會將信息寫到Redis緩存中,然后Node.js通過socket.io讀取Redis中的信息,并且使用d3js顯示在頁面上。
首先,我們需要編寫Kafka 生產(chǎn)者的部分代碼,主要是模擬讀取汽車的實時數(shù)據(jù)并向Kafka集群進(jìn)行發(fā)送,我們實現(xiàn)了一個KafkaCarDataProducer類,通過配置ProducerConfig來創(chuàng)建一個Producer對象來發(fā)送數(shù)據(jù)。它可以用來連接到Zookeeper,或者直接是Kafka 代理。例如:kafkaclient.cloudapp.net:2181或者0:kafkaclient.cloudapp.net:9092。代碼中我們根據(jù)不同的連接字符串設(shè)置不同配置。偽代碼如下:
然后就可以直接通過下面代碼來發(fā)送消息:
接下來我們需要編寫3個Storm類,首先是創(chuàng)建Storm的Topology,這個類叫KafkaCarTopology,我們創(chuàng)建了一個叫car的topic,然后定義本機(jī)一個hosts和Zookeeper hosts,最后創(chuàng)建一個Spout,叫做KafkaSpout,然后添加ParseCarDataBolt連接到KafkaSout,再創(chuàng)建一個RedisCarBolt,用于將結(jié)果寫入Redis緩存。最后根據(jù)參數(shù)創(chuàng)建3個Worker,提交Storm Topology。
在這個拓?fù)浣Y(jié)構(gòu)中,我們有2個Bolt用于數(shù)據(jù)的處理,第一個叫ParserCarDataBolt,這個Bolt主要將Kafka傳出的消息轉(zhuǎn)換為Json格式,它繼承BaseBasicBolt,在execute函數(shù)中通過collector提交數(shù)據(jù),同時重載了declareOutputFields函數(shù),通知下一個Bolt的數(shù)據(jù)格式。代碼如下:
數(shù)據(jù)會被寫入RedisCarBolt,再寫入到Redis緩存中。它繼承自BaseRichBolt,需要重載prepare和excute方法來處理消息元組。此外還需要重載prepare和cleanup函數(shù),幾個關(guān)鍵的函數(shù)如下:
最后我們還需要編寫一些Node.js的代碼,保證在頁面上通過socket.io進(jìn)行通訊,實時將最終數(shù)據(jù)從Redis里面讀取出來,并在BingMap上顯示。
到此為止,一個簡單的車輛信息實時監(jiān)控系統(tǒng)就實現(xiàn)了,我們通過bash腳本進(jìn)行編譯,并安裝到相應(yīng)的服務(wù)器上,比如下列代碼需要被安裝在Storm的服務(wù)器上:
有一點需要注意的是,由于在編譯過程中需要自動下載Storm庫,在阿里云的國內(nèi)機(jī)房的虛擬機(jī)很有可能需要設(shè)置代理進(jìn)行。設(shè)置代理的方法也很簡單,通過對lein命令增加以下參數(shù)就可以了:http_proxy=http://URL:PORT
接著我們在網(wǎng)頁上訪問http://webhostname或者運行node.js的服務(wù)器,就會看到下面的網(wǎng)頁,同時發(fā)現(xiàn)網(wǎng)頁將同步刷新汽車的實時位置、速度、轉(zhuǎn)速等。
圖6 車聯(lián)網(wǎng)監(jiān)控系統(tǒng)演示頁面
四、對車聯(lián)網(wǎng)監(jiān)控系統(tǒng)的性能測試
接下來我們對這個系統(tǒng)進(jìn)行了一個簡單的吞吐量測試。我們只有1個Topic,使用5個partition、3個worker、1個Spout和2個Bolt,在一臺2核2GB的ECS上運行。我們使用了另外4臺客戶端,每個客戶端有4核8G內(nèi)存,分別啟動40個線程不斷向這個系統(tǒng)實時發(fā)送汽車信息,模擬160臺汽車發(fā)送的情況,其消息發(fā)送數(shù)量和CPU占用率情況如圖7所示。
圖7 車聯(lián)網(wǎng)監(jiān)控系統(tǒng)性能分析
從圖7中可以看出,平均每輛汽車客戶端會模擬每秒給系統(tǒng)發(fā)送了1000條消息,總的吞吐量達(dá)到16萬條左右,此時平均的CPU占用率大約在30%左右。如果系統(tǒng)是完全線性的,在系統(tǒng)CPU占用率達(dá)到90%的情況下,大約能處理48萬條消息。不過實際情況中,在阿里云ECS上,卻發(fā)現(xiàn)CPU達(dá)到50%以后,就不再上升,而客戶端發(fā)送消息的延時也逐步增加。
經(jīng)過分析以后發(fā)現(xiàn),由于ECS的磁盤性能無法和物理機(jī)的SSD磁盤相比,所以在Kafka消息大量寫入磁盤的過程中,吞吐量下降,磁盤讀寫負(fù)擔(dān)變得非常大。這時我們增加了Kafka的Broker和Storm的Spout的數(shù)量,將消息分布式地分發(fā)到多臺ECS上,從而實現(xiàn)了消息吞吐量的線性增加。
在這個系統(tǒng)中,我們不推薦使用大核和大內(nèi)存的機(jī)器,而推薦使用多臺2核2GB的服務(wù)器分布式地處理消息。這也是云計算處理大數(shù)據(jù)的原則所在,使用橫向擴(kuò)展而不用縱向擴(kuò)展。
五、結(jié)論
至此我們介紹了利用Storm和Kafka實現(xiàn)大數(shù)據(jù)的實時處理,并且介紹了如何在云上通過鏡像快速地創(chuàng)建這套系統(tǒng)。此外,我們還介紹了如何對Storm、Kafka、Redis以及Node.js開發(fā)出一個實時的車輛信息監(jiān)控系統(tǒng)。這個系統(tǒng)能夠?qū)崿F(xiàn)高性能、大吞吐量和高并發(fā)。當(dāng)然,隨著大數(shù)據(jù)的快速發(fā)展,我們相信還會有越來越多好的工具和產(chǎn)品出現(xiàn)在市場上,到那時我們從大數(shù)據(jù)中獲取有效的信息將會變得更加容易和便捷。有了云計算的幫助,開發(fā)的周期也會變得越來越短。
核心關(guān)注:拓步ERP系統(tǒng)平臺是覆蓋了眾多的業(yè)務(wù)領(lǐng)域、行業(yè)應(yīng)用,蘊涵了豐富的ERP管理思想,集成了ERP軟件業(yè)務(wù)管理理念,功能涉及供應(yīng)鏈、成本、制造、CRM、HR等眾多業(yè)務(wù)領(lǐng)域的管理,全面涵蓋了企業(yè)關(guān)注ERP管理系統(tǒng)的核心領(lǐng)域,是眾多中小企業(yè)信息化建設(shè)首選的ERP管理軟件信賴品牌。
轉(zhuǎn)載請注明出處:拓步ERP資訊網(wǎng)http://www.guhuozai8.cn/
本文標(biāo)題:在云上搭建大規(guī)模實時數(shù)據(jù)流處理系統(tǒng)
本文網(wǎng)址:http://www.guhuozai8.cn/html/consultation/10839717486.html