您好,登錄后才能下訂單哦!
本篇文章為大家展示了云函數(shù)SCF與對象存儲實現(xiàn)WordCount算法的過程,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。
將嘗試通過 MapReduce 模型實現(xiàn)一個簡單的 WordCount 算法,區(qū)別于傳統(tǒng)使用 Hadoop 等大數(shù)據(jù)框架,使用云函數(shù) SCF 與對象存儲 COS 來實現(xiàn)。
MapReduce 在維基百科中的解釋如下:
MapReduce 是 Google 提出的一個軟件架構(gòu),用于大規(guī)模數(shù)據(jù)集(大于 1TB)的并行運算。概念「Map(映射)」和「Reduce(歸納)」,及他們的主要思想,都是從函數(shù)式編程語言借來的,還有從矢量編程語言借來的特性。
通過這段描述,我們知道,MapReduce 是面向大數(shù)據(jù)并行處理的計算模型、框架和平臺,在傳統(tǒng)學(xué)習(xí)中,通常會在 Hadoop 等分布式框架下進行 MapReduce 相關(guān)工作,隨著云計算的逐漸發(fā)展,各個云廠商也都先后推出了在線的 MapReduce 業(yè)務(wù)。
在開始之前,我們根據(jù) MapReduce 的要求,先繪制一個簡單的流程圖:
在這個結(jié)構(gòu)中,我們需要 2 個云函數(shù)分別作 Mapper 和 Reducer;以及 3 個對象存儲的存儲桶,分別作為輸入的存儲桶、中間臨時緩存存儲桶和結(jié)果存儲桶。在實例前,由于我們的函數(shù)即將部署在廣州區(qū),因此在廣州區(qū)建立 3 個存儲桶:
對象存儲1 ap-guangzhou srcmr 對象存儲2 ap-guangzhou middlestagebucket 對象存儲3 ap-guangzhou destcmr
為了讓整個 Mapper 和 Reducer 邏輯更加清晰,在開始之前先對傳統(tǒng)的 WordCount 結(jié)構(gòu)進行改造,使其更加適合云函數(shù),同時合理分配 Mapper 和 Reducer 的工作:
編寫 Mapper 相關(guān)邏輯,代碼如下:
# -*- coding: utf8 -*- import datetime from qcloud_cos_v5 import CosConfig from qcloud_cos_v5 import CosS3Client from qcloud_cos_v5 import CosServiceError import re import os import sys import logging logging.basicConfig(level=logging.INFO, stream=sys.stdout) logger = logging.getLogger() logger.setLevel(level=logging.INFO) region = u'ap-guangzhou' # 根據(jù)實際情況,修改地域 middle_stage_bucket = 'middlestagebucket' # 根據(jù)實際情況,修改bucket名 def delete_file_folder(src): if os.path.isfile(src): try: os.remove(src) except: pass elif os.path.isdir(src): for item in os.listdir(src): itemsrc = os.path.join(src, item) delete_file_folder(itemsrc) try: os.rmdir(src) except: pass def download_file(cos_client, bucket, key, download_path): logger.info("Get from [%s] to download file [%s]" % (bucket, key)) try: response = cos_client.get_object(Bucket=bucket, Key=key, ) response['Body'].get_stream_to_file(download_path) except CosServiceError as e: print(e.get_error_code()) print(e.get_error_msg()) return -1 return 0 def upload_file(cos_client, bucket, key, local_file_path): logger.info("Start to upload file to cos") try: response = cos_client.put_object_from_local_file( Bucket=bucket, LocalFilePath=local_file_path, Key='{}'.format(key)) except CosServiceError as e: print(e.get_error_code()) print(e.get_error_msg()) return -1 logger.info("Upload data map file [%s] Success" % key) return 0 def do_mapping(cos_client, bucket, key, middle_stage_bucket, middle_file_key): src_file_path = u'/tmp/' + key.split('/')[-1] middle_file_path = u'/tmp/' + u'mapped_' + key.split('/')[-1] download_ret = download_file(cos_client, bucket, key, src_file_path) # download src file if download_ret == 0: inputfile = open(src_file_path, 'r') # open local /tmp file mapfile = open(middle_file_path, 'w') # open a new file write stream for line in inputfile: line = re.sub('[^a-zA-Z0-9]', ' ', line) # replace non-alphabetic/number characters words = line.split() for word in words: mapfile.write('%st%s' % (word, 1)) # count for 1 mapfile.write('n') inputfile.close() mapfile.close() upload_ret = upload_file(cos_client, middle_stage_bucket, middle_file_key, middle_file_path) # upload the file's each word delete_file_folder(src_file_path) delete_file_folder(middle_file_path) return upload_ret else: return -1 def map_caller(event, context, cos_client): appid = event['Records'][0]['cos']['cosBucket']['appid'] bucket = event['Records'][0]['cos']['cosBucket']['name'] + '-' + appid key = event['Records'][0]['cos']['cosObject']['key'] key = key.replace('/' + str(appid) + '/' + event['Records'][0]['cos']['cosBucket']['name'] + '/', '', 1) logger.info("Key is " + key) middle_bucket = middle_stage_bucket + '-' + appid middle_file_key = '/' + 'middle_' + key.split('/')[-1] return do_mapping(cos_client, bucket, key, middle_bucket, middle_file_key) def main_handler(event, context): logger.info("start main handler") if "Records" not in event.keys(): return {"errorMsg": "event is not come from cos"} secret_id = "" secret_key = "" config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, ) cos_client = CosS3Client(config) start_time = datetime.datetime.now() res = map_caller(event, context, cos_client) end_time = datetime.datetime.now() print("data mapping duration: " + str((end_time - start_time).microseconds / 1000) + "ms") if res == 0: return "Data mapping SUCCESS" else: return "Data mapping FAILED"
同樣的方法,建立 reducer.py
文件,編寫 Reducer 邏輯,代碼如下:
# -*- coding: utf8 -*- from qcloud_cos_v5 import CosConfig from qcloud_cos_v5 import CosS3Client from qcloud_cos_v5 import CosServiceError from operator import itemgetter import os import sys import datetime import logging region = u'ap-guangzhou' # 根據(jù)實際情況,修改地域 result_bucket = u'destmr' # 根據(jù)實際情況,修改bucket名 logging.basicConfig(level=logging.INFO, stream=sys.stdout) logger = logging.getLogger() logger.setLevel(level=logging.INFO) def delete_file_folder(src): if os.path.isfile(src): try: os.remove(src) except: pass elif os.path.isdir(src): for item in os.listdir(src): itemsrc = os.path.join(src, item) delete_file_folder(itemsrc) try: os.rmdir(src) except: pass def download_file(cos_client, bucket, key, download_path): logger.info("Get from [%s] to download file [%s]" % (bucket, key)) try: response = cos_client.get_object(Bucket=bucket, Key=key, ) response['Body'].get_stream_to_file(download_path) except CosServiceError as e: print(e.get_error_code()) print(e.get_error_msg()) return -1 return 0 def upload_file(cos_client, bucket, key, local_file_path): logger.info("Start to upload file to cos") try: response = cos_client.put_object_from_local_file( Bucket=bucket, LocalFilePath=local_file_path, Key='{}'.format(key)) except CosServiceError as e: print(e.get_error_code()) print(e.get_error_msg()) return -1 logger.info("Upload data map file [%s] Success" % key) return 0 def qcloud_reducer(cos_client, bucket, key, result_bucket, result_key): word2count = {} src_file_path = u'/tmp/' + key.split('/')[-1] result_file_path = u'/tmp/' + u'result_' + key.split('/')[-1] download_ret = download_file(cos_client, bucket, key, src_file_path) if download_ret == 0: map_file = open(src_file_path, 'r') result_file = open(result_file_path, 'w') for line in map_file: line = line.strip() word, count = line.split('t', 1) try: count = int(count) word2count[word] = word2count.get(word, 0) + count except ValueError: logger.error("error value: %s, current line: %s" % (ValueError, line)) continue map_file.close() delete_file_folder(src_file_path) sorted_word2count = sorted(word2count.items(), key=itemgetter(1))[::-1] for wordcount in sorted_word2count: res = '%st%s' % (wordcount[0], wordcount[1]) result_file.write(res) result_file.write('n') result_file.close() upload_ret = upload_file(cos_client, result_bucket, result_key, result_file_path) delete_file_folder(result_file_path) return upload_ret def reduce_caller(event, context, cos_client): appid = event['Records'][0]['cos']['cosBucket']['appid'] bucket = event['Records'][0]['cos']['cosBucket']['name'] + '-' + appid key = event['Records'][0]['cos']['cosObject']['key'] key = key.replace('/' + str(appid) + '/' + event['Records'][0]['cos']['cosBucket']['name'] + '/', '', 1) logger.info("Key is " + key) res_bucket = result_bucket + '-' + appid result_key = '/' + 'result_' + key.split('/')[-1] return qcloud_reducer(cos_client, bucket, key, res_bucket, result_key) def main_handler(event, context): logger.info("start main handler") if "Records" not in event.keys(): return {"errorMsg": "event is not come from cos"} secret_id = "SecretId" secret_key = "SecretKey" config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, ) cos_client = CosS3Client(config) start_time = datetime.datetime.now() res = reduce_caller(event, context, cos_client) end_time = datetime.datetime.now() print("data reducing duration: " + str((end_time - start_time).microseconds / 1000) + "ms") if res == 0: return "Data reducing SUCCESS" else: return "Data reducing FAILED"
遵循 Serverless Framework 的 yaml
規(guī)范,編寫 serveerless.yaml
:
WordCountMapper: component: "@serverless/tencent-scf" inputs: name: mapper codeUri: ./code handler: index.main_handler runtime: Python3.6 region: ap-guangzhou description: 網(wǎng)站監(jiān)控 memorySize: 64 timeout: 20 events: - cos: name: srcmr-1256773370.cos.ap-guangzhou.myqcloud.com parameters: bucket: srcmr-1256773370.cos.ap-guangzhou.myqcloud.com filter: prefix: '' suffix: '' events: cos:ObjectCreated:* enable: true WordCountReducer: component: "@serverless/tencent-scf" inputs: name: reducer codeUri: ./code handler: index.main_handler runtime: Python3.6 region: ap-guangzhou description: 網(wǎng)站監(jiān)控 memorySize: 64 timeout: 20 events: - cos: name: middlestagebucket-1256773370.cos.ap-guangzhou.myqcloud.com parameters: bucket: middlestagebucket-1256773370.cos.ap-guangzhou.myqcloud.com filter: prefix: '' suffix: '' events: cos:ObjectCreated:* enable: true
完成之后,通過 sls --debug
指令進行部署。部署成功之后,進行基本的測試:
準(zhǔn)備一個英文文檔:
登錄騰訊云后臺,打開我們最初建立的存儲桶:srcmr,并上傳該文件;
上傳成功之后,稍等片刻即可看到 Reducer 程序已經(jīng)在 Mapper 執(zhí)行之后,產(chǎn)出日志:
此時,我們打開結(jié)果存儲桶,查看結(jié)果:
現(xiàn)在,我們就完成了簡單的詞頻統(tǒng)計功能。
Serverless 架構(gòu)是適用于大數(shù)據(jù)處理的。在騰訊云官網(wǎng),我們也可以看到其關(guān)于數(shù)據(jù) ETL 處理的場景描述:
本實例中,有一鍵部署多個函數(shù)的操作。在實際生產(chǎn)中,每個項目都不會是單個函數(shù)單打獨斗的,而是多個函數(shù)組合應(yīng)用,形成一個 Service 體系,所以一鍵部署多個函數(shù)就顯得尤為重要。通過本實例,希望讀者可以對 Serverless 架構(gòu)的應(yīng)用場景有更多的了解,并且能有所啟發(fā),將云函數(shù)和不同觸發(fā)器進行組合,應(yīng)用在自身業(yè)務(wù)中。
上述內(nèi)容就是云函數(shù)SCF與對象存儲實現(xiàn)WordCount算法的過程,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。