溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何使用Function Compute對表格存儲中數(shù)據(jù)做簡單清洗

發(fā)布時間:2022-01-05 13:46:38 來源:億速云 閱讀:151 作者:柒染 欄目:云計算

今天就跟大家聊聊有關(guān) 如何使用Function Compute對表格存儲中數(shù)據(jù)做簡單清洗,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

函數(shù)計算(Function Compute) 是一個事件驅(qū)動的服務(wù),通過函數(shù)計算,用戶無需管理服務(wù)器等運(yùn)行情況,只需編寫代碼并上傳。函數(shù)計算準(zhǔn)備計算資源,并以彈性伸縮的方式運(yùn)行用戶代碼,而用戶只需根據(jù)實際代碼運(yùn)行所消耗的資源進(jìn)行付費。

Table Store Stream是用于獲取Table Store表中增量數(shù)據(jù)的一個數(shù)據(jù)通道,通過創(chuàng)建Table Store觸發(fā)器,能夠?qū)崿F(xiàn)Table Store Stream和函數(shù)計算的自動對接,讓計算函數(shù)中自定義的程序邏輯自動處理Table Store表中發(fā)生的數(shù)據(jù)修改。

表格存儲高并發(fā)的寫入性能以及低廉的存儲成本非常適合物聯(lián)網(wǎng)、日志、監(jiān)控數(shù)據(jù)的存儲,我們可以將數(shù)據(jù)寫入到表格存儲中,同時在函數(shù)計算中對新增的數(shù)據(jù)做簡單的清洗、轉(zhuǎn)換、聚合計算等操作,并將清洗之后的數(shù)據(jù)寫回到表格存儲的結(jié)果表中,并對原始明細(xì)數(shù)據(jù)及結(jié)果數(shù)據(jù)提供實時訪問。

下面,我們使用函數(shù)計算對表格存儲中的數(shù)據(jù)做簡單的清洗,并寫入到結(jié)果表中。

數(shù)據(jù)定義

我們假設(shè)寫入的為日志數(shù)據(jù),包括三個基礎(chǔ)字段:

字段名稱類型含義
id整型日志id
level整型日志的等級,越大表明等級越高
message字符串日志的內(nèi)容

我們需要將 level>1 的日志寫入到另外一張數(shù)據(jù)表中,用作專門的查詢。

實現(xiàn)過程:

創(chuàng)建實例及數(shù)據(jù)表

在表格存儲的控制臺創(chuàng)建表格存儲實例(__本次以 華東2 distribute-test 為例__),并創(chuàng)建源表(__source_data__)及結(jié)果表(__result__),主鍵為均 __id (整型)__,由于表格存儲是 schemafree 結(jié)構(gòu),無需預(yù)先定義其他屬性列字段。

如何使用Function Compute對表格存儲中數(shù)據(jù)做簡單清洗

開啟數(shù)據(jù)源表的Stream功能

觸發(fā)器功能需要先開啟數(shù)據(jù)表的Stream功能,才能在函數(shù)計算中處理寫入表格存儲中的增量數(shù)據(jù)。

如何使用Function Compute對表格存儲中數(shù)據(jù)做簡單清洗

Stream記錄過期時長 為通過 StreamAPI 能夠讀取到的增量數(shù)據(jù)的最長時間。

由于觸發(fā)器只能綁定現(xiàn)有的函數(shù),故先到函數(shù)計算的控制臺上在同region創(chuàng)建服務(wù)及函數(shù)。

創(chuàng)建函數(shù)計算服務(wù)

在函數(shù)計算的控制臺上創(chuàng)建服務(wù)及處理函數(shù),我們繼續(xù)使用華東2節(jié)點。

1.在華東2節(jié)點創(chuàng)建服務(wù)。

如何使用Function Compute對表格存儲中數(shù)據(jù)做簡單清洗

2.創(chuàng)建函數(shù)依次選擇:空白函數(shù)——不創(chuàng)建觸發(fā)器。

如何使用Function Compute對表格存儲中數(shù)據(jù)做簡單清洗

如何使用Function Compute對表格存儲中數(shù)據(jù)做簡單清洗

  • 函數(shù)名稱為:etl_test,選擇 python2.7 環(huán)境,在線編輯代碼

  • 函數(shù)入口為:etl_test.handler

  • 代碼稍后編輯,點擊下一步。

3.進(jìn)行服務(wù)授權(quán)

由于函數(shù)計算需要將運(yùn)行中的日志寫入到日志服務(wù)中,同時,需要對表格存儲的表進(jìn)行讀寫,故需要對函數(shù)計算進(jìn)行授權(quán),為方便起見,我們先添加 AliyunOTSFullAccess 與 __AliyunLogFullAccess __權(quán)限,實際生產(chǎn)中,建議根據(jù)權(quán)限最小原則來添加權(quán)限。

如何使用Function Compute對表格存儲中數(shù)據(jù)做簡單清洗

4.點擊授權(quán)完成,并創(chuàng)建函數(shù)。

5.修改函數(shù)代碼。

創(chuàng)建好函數(shù)之后,點擊對應(yīng)的函數(shù)代碼執(zhí)行,編輯代碼并保存,其中,INSTANCE_NAME(表格存儲的實例名稱)、REGION(使用的區(qū)域)需要根據(jù)情況進(jìn)行修改:如何使用Function Compute對表格存儲中數(shù)據(jù)做簡單清洗

使用示例代碼如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import cbor
import json
import tablestore as ots

INSTANCE_NAME = 'distribute-test'
REGION = 'cn-shanghai'
ENDPOINT = 'http://%s.%s.ots-internal.aliyuncs.com'%(INSTANCE_NAME, REGION)
RESULT_TABLENAME = 'result'


def _utf8(input):
    return str(bytearray(input, "utf-8"))

def get_attrbute_value(record, column):
    attrs = record[u'Columns']
    for x in attrs:
        if x[u'ColumnName'] == column:
            return x['Value']

def get_pk_value(record, column):
    attrs = record[u'PrimaryKey']
    for x in attrs:
        if x['ColumnName'] == column:
            return x['Value']

#由于已經(jīng)授權(quán)了AliyunOTSFullAccess權(quán)限,此處獲取的credentials具有訪問表格存儲的權(quán)限
def get_ots_client(context):
    creds = context.credentials
    client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)
    return client

def save_to_ots(client, record):
    id = int(get_pk_value(record, 'id'))
    level = int(get_attrbute_value(record, 'level'))
    msg = get_attrbute_value(record, 'message')

    pk = [(_utf8('id'), id),]
    attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),]
    row = ots.Row(pk, attr)
    client.put_row(RESULT_TABLENAME, row)

def handler(event, context):
    records = cbor.loads(event)
    #records = json.loads(event)
    client = get_ots_client(context)
    for record in records['Records']:
        level = int(get_attrbute_value(record, 'level'))
        if level > 1:
            save_to_ots(client, record)
        else:
            print "Level <= 1, ignore."

對表格存儲 Stream 數(shù)據(jù)的格式詳情請參考Stream 數(shù)據(jù)處理

綁定觸發(fā)器

1.回到表格存儲的實例管理頁面,點擊表 source_data 后的 使用觸發(fā)器 按鈕,進(jìn)入觸發(fā)器綁定界面,點擊使用已有函數(shù)計算, 選擇剛創(chuàng)建的服務(wù)及函數(shù),勾選 表格存儲發(fā)送事件通知的權(quán)限, 進(jìn)行確定。如何使用Function Compute對表格存儲中數(shù)據(jù)做簡單清洗

2.綁定成功之后,能夠看到如下的信息:如何使用Function Compute對表格存儲中數(shù)據(jù)做簡單清洗

運(yùn)行驗證

1.向 source_data 表中寫入數(shù)據(jù)。如何使用Function Compute對表格存儲中數(shù)據(jù)做簡單清洗

2.在 result 表中查詢清洗后的數(shù)據(jù)

點擊 result 表的數(shù)據(jù)管理頁面,會查詢到剛寫入到 source_data 中的數(shù)據(jù)。
當(dāng)然,向 soure_data 寫入level <=1的數(shù)據(jù)將不會同步到 result 表中

如何使用Function Compute對表格存儲中數(shù)據(jù)做簡單清洗

看完上述內(nèi)容,你們對 如何使用Function Compute對表格存儲中數(shù)據(jù)做簡單清洗有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI