您好,登錄后才能下訂單哦!
如何用隨機(jī)森林算法實(shí)現(xiàn)scikit-learn、Spark MLlib、DolphinDB、xgboost的性能對(duì)比測(cè)試,相信很多沒(méi)有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問(wèn)題出現(xiàn)的原因和解決方法,通過(guò)這篇文章希望你能解決這個(gè)問(wèn)題。
隨機(jī)森林是常用的機(jī)器學(xué)習(xí)算法,既可以用于分類問(wèn)題,也可用于回歸問(wèn)題。本文對(duì)scikit-learn、Spark MLlib、DolphinDB、xgboost四個(gè)平臺(tái)的隨機(jī)森林算法實(shí)現(xiàn)進(jìn)行對(duì)比測(cè)試。評(píng)價(jià)指標(biāo)包括內(nèi)存占用、運(yùn)行速度和分類準(zhǔn)確性。本次測(cè)試使用模擬生成的數(shù)據(jù)作為輸入進(jìn)行二分類訓(xùn)練,并用生成的模型對(duì)模擬數(shù)據(jù)進(jìn)行預(yù)測(cè)。
本次測(cè)試使用的各平臺(tái)版本如下:
scikit-learn:Python 3.7.1,scikit-learn 0.20.2
Spark MLlib:Spark 2.0.2,Hadoop 2.7.2
DolphinDB:0.82
xgboost:Python package,0.81
CPU:Intel(R) Xeon(R) CPU E5-2650 v4 2.20GHz(共24核48線程)
RAM:512GB
操作系統(tǒng):CentOS Linux release 7.5.1804
在各平臺(tái)上進(jìn)行測(cè)試時(shí),都會(huì)把數(shù)據(jù)加載到內(nèi)存中再進(jìn)行計(jì)算,因此隨機(jī)森林算法的性能與磁盤無(wú)關(guān)。
本次測(cè)試使用DolphinDB腳本產(chǎn)生模擬數(shù)據(jù),并導(dǎo)出為CSV文件。訓(xùn)練集平均分成兩類,每個(gè)類別的特征列分別服從兩個(gè)中心不同,標(biāo)準(zhǔn)差相同,且兩兩獨(dú)立的多元正態(tài)分布N(0, 1)和N(2/sqrt(20), 1)。訓(xùn)練集中沒(méi)有空值。
假設(shè)訓(xùn)練集的大小為n行p列。本次測(cè)試中n的取值為10,000、100,000、1,000,000,p的取值為50。
由于測(cè)試集和訓(xùn)練集獨(dú)立同分布,測(cè)試集的大小對(duì)模型準(zhǔn)確性評(píng)估沒(méi)有顯著影響。本次測(cè)試對(duì)于所有不同大小的訓(xùn)練集都采用1000行的模擬數(shù)據(jù)作為測(cè)試集。
產(chǎn)生模擬數(shù)據(jù)的DolphinDB腳本見(jiàn)附錄1。
在各個(gè)平臺(tái)中都采用以下參數(shù)進(jìn)行隨機(jī)森林模型訓(xùn)練:
樹(shù)的棵數(shù):500
最大深度:分別在4個(gè)平臺(tái)中測(cè)試了最大深度為10和30兩種情況
劃分節(jié)點(diǎn)時(shí)選取的特征數(shù):總特征數(shù)的平方根,即integer(sqrt(50))=7
劃分節(jié)點(diǎn)時(shí)的不純度(Impurity)指標(biāo):基尼指數(shù)(Gini index),該參數(shù)僅對(duì)Python scikit-learn、Spark MLlib和DolphinDB有效
采樣的桶數(shù):32,該參數(shù)僅對(duì)Spark MLlib和DolphinDB有效
并發(fā)任務(wù)數(shù):CPU線程數(shù),Python scikit-learn、Spark MLlib和DolphinDB取48,xgboost取24。
在測(cè)試xgboost時(shí),嘗試了參數(shù)nthread(表示運(yùn)行時(shí)的并發(fā)線程數(shù))的不同取值。但當(dāng)該參數(shù)取值為本次測(cè)試環(huán)境的線程數(shù)(48)時(shí),性能并不理想。進(jìn)一步觀察到,在線程數(shù)小于10時(shí),性能與取值成正相關(guān)。在線程數(shù)大于10小于24時(shí),不同取值的性能差異不明顯,此后,線程數(shù)增加時(shí)性能反而下降。該現(xiàn)象在xgboost社區(qū)中也有人討論過(guò)。因此,本次測(cè)試在xgboost中最終使用的線程數(shù)為24。
測(cè)試腳本見(jiàn)附錄2~5。
當(dāng)樹(shù)的數(shù)量為500,最大深度為10時(shí),測(cè)試結(jié)果如下表所示:
當(dāng)樹(shù)的數(shù)量為500,最大深度為30時(shí),測(cè)試結(jié)果如下表所示:
從準(zhǔn)確率上看,Python scikit-learn、Spark MLlib和DolphinDB的準(zhǔn)確率比較相近,略高于xgboost的實(shí)現(xiàn);從性能上看,從高到低依次為DolphinDB、Python scikit-learn、xgboost、Spark MLlib。
在本次測(cè)試中,Python scikit-learn的實(shí)現(xiàn)使用了所有CPU核。
Spark MLlib的實(shí)現(xiàn)沒(méi)有充分使用所有CPU核,內(nèi)存占用最高,當(dāng)數(shù)據(jù)量為10,000時(shí),CPU峰值占用率約8%,當(dāng)數(shù)據(jù)量為100,000時(shí),CPU峰值占用率約為25%,當(dāng)數(shù)據(jù)量為1,000,000時(shí),它會(huì)因?yàn)閮?nèi)存不足而中斷執(zhí)行。
DolphinDB database 的實(shí)現(xiàn)使用了所有CPU核,并且它是所有實(shí)現(xiàn)中速度最快的,但內(nèi)存占用是scikit-learn的2-7倍,是xgboost的3-9倍。DolphinDB的隨機(jī)森林算法實(shí)現(xiàn)提供了numJobs參數(shù),可以通過(guò)調(diào)整該參數(shù)來(lái)降低并行度,從而減少內(nèi)存占用。詳情請(qǐng)參考DolphinDB用戶手冊(cè)。
xgboost常用于boosted trees的訓(xùn)練,也能進(jìn)行隨機(jī)森林算法。它是算法迭代次數(shù)為1時(shí)的特例。xgboost實(shí)際上在24線程左右時(shí)性能最高,其對(duì)CPU線程的利用率不如Python和DolphinDB,速度也不及兩者。其優(yōu)勢(shì)在于內(nèi)存占用最少。另外,xgboost的具體實(shí)現(xiàn)也和其他平臺(tái)的實(shí)現(xiàn)有所差異。例如,沒(méi)有bootstrap這一過(guò)程,對(duì)數(shù)據(jù)使用無(wú)放回抽樣而不是有放回抽樣。這可以解釋為何它的準(zhǔn)確率略低于其它平臺(tái)。
Python scikit-learn的隨機(jī)森林算法實(shí)現(xiàn)在性能、內(nèi)存開(kāi)銷和準(zhǔn)確率上的表現(xiàn)比較均衡,Spark MLlib的實(shí)現(xiàn)在性能和內(nèi)存開(kāi)銷上的表現(xiàn)遠(yuǎn)遠(yuǎn)不如其他平臺(tái)。DolphinDB的隨機(jī)森林算法實(shí)現(xiàn)性能最優(yōu),并且DolphinDB的隨機(jī)森林算法和數(shù)據(jù)庫(kù)是無(wú)縫集成的,用戶可以直接對(duì)數(shù)據(jù)庫(kù)中的數(shù)據(jù)進(jìn)行訓(xùn)練和預(yù)測(cè),并且提供了numJobs參數(shù),實(shí)現(xiàn)內(nèi)存和速度之間的平衡。而xgboost的隨機(jī)森林只是迭代次數(shù)為1時(shí)的特例,具體實(shí)現(xiàn)和其他平臺(tái)差異較大,最佳的應(yīng)用場(chǎng)景為boosted tree。
1. 模擬生成數(shù)據(jù)的DolphinDB腳本
def genNormVec(cls, a, stdev, n) { return norm(cls * a, stdev, n) } def genNormData(dataSize, colSize, clsNum, scale, stdev) { t = table(dataSize:0, `cls join ("col" + string(0..(colSize-1))), INT join take(DOUBLE,colSize)) classStat = groupby(count,1..dataSize, rand(clsNum, dataSize)) for(row in classStat){ cls = row.groupingKey classSize = row.count cols = [take(cls, classSize)] for (i in 0:colSize) cols.append!(genNormVec(cls, scale, stdev, classSize)) tmp = table(dataSize:0, `cls join ("col" + string(0..(colSize-1))), INT join take(DOUBLE,colSize)) insert into t values (cols) cols = NULL tmp = NULL } return t } colSize = 50 clsNum = 2 t1m = genNormData(10000, colSize, clsNum, 2 / sqrt(20), 1.0) saveText(t1m, "t10k.csv") t10m = genNormData(100000, colSize, clsNum, 2 / sqrt(20), 1.0) saveText(t10m, "t100k.csv") t100m = genNormData(1000000, colSize, clsNum, 2 / sqrt(20), 1.0) saveText(t100m, "t1m.csv") t1000 = genNormData(1000, colSize, clsNum, 2 / sqrt(20), 1.0) saveText(t1000, "t1000.csv")
2. Python scikit-learn的訓(xùn)練和預(yù)測(cè)腳本
import pandas as pd import numpy as np from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor from time import * test_df = pd.read_csv("t1000.csv") def evaluate(path, model_name, num_trees=500, depth=30, num_jobs=1): df = pd.read_csv(path) y = df.values[:,0] x = df.values[:,1:] test_y = test_df.values[:,0] test_x = test_df.values[:,1:] rf = RandomForestClassifier(n_estimators=num_trees, max_depth=depth, n_jobs=num_jobs) start = time() rf.fit(x, y) end = time() elapsed = end - start print("Time to train model %s: %.9f seconds" % (model_name, elapsed)) acc = np.mean(test_y == rf.predict(test_x)) print("Model %s accuracy: %.3f" % (model_name, acc)) evaluate("t10k.csv", "10k", 500, 10, 48) # choose your own parameter
3. Spark MLlib的訓(xùn)練和預(yù)測(cè)代碼(Scala實(shí)現(xiàn))
import org.apache.spark.mllib.tree.configuration.FeatureType.Continuous import org.apache.spark.mllib.tree.model.{DecisionTreeModel, Node} object Rf { def main(args: Array[String]) = { evaluate("/t100k.csv", 500, 10) // choose your own parameter } def processCsv(row: Row) = { val label = row.getString(0).toDouble val featureArray = (for (i <- 1 to (row.size-1)) yield row.getString(i).toDouble).toArray val features = Vectors.dense(featureArray) LabeledPoint(label, features) } def evaluate(path: String, numTrees: Int, maxDepth: Int) = { val spark = SparkSession.builder.appName("Rf").getOrCreate() import spark.implicits._ val numClasses = 2 val categoricalFeaturesInfo = Map[Int, Int]() val featureSubsetStrategy = "sqrt" val impurity = "gini" val maxBins = 32 val d_test = spark.read.format("CSV").option("header","true").load("/t1000.csv").map(processCsv).rdd d_test.cache() println("Loading table (1M * 50)") val d_train = spark.read.format("CSV").option("header","true").load(path).map(processCsv).rdd d_train.cache() println("Training table (1M * 50)") val now = System.nanoTime val model = RandomForest.trainClassifier(d_train, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) println(( System.nanoTime - now )/1e9) val scoreAndLabels = d_test.map { point => val score = model.trees.map(tree => softPredict2(tree, point.features)).sum if (score * 2 > model.numTrees) (1.0, point.label) else (0.0, point.label) } val metrics = new MulticlassMetrics(scoreAndLabels) println(metrics.accuracy) } def softPredict(node: Node, features: Vector): Double = { if (node.isLeaf) { //if (node.predict.predict == 1.0) node.predict.prob else 1.0 - node.predict.prob node.predict.predict } else { if (node.split.get.featureType == Continuous) { if (features(node.split.get.feature) <= node.split.get.threshold) { softPredict(node.leftNode.get, features) } else { softPredict(node.rightNode.get, features) } } else { if (node.split.get.categories.contains(features(node.split.get.feature))) { softPredict(node.leftNode.get, features) } else { softPredict(node.rightNode.get, features) } } } } def softPredict2(dt: DecisionTreeModel, features: Vector): Double = { softPredict(dt.topNode, features) } }
4. DolphinDB的訓(xùn)練和預(yù)測(cè)腳本
def createInMemorySEQTable(t, seqSize) { db = database("", SEQ, seqSize) dataSize = t.size() ts = () for (i in 0:seqSize) { ts.append!(t[(i * (dataSize/seqSize)):((i+1)*(dataSize/seqSize))]) } return db.createPartitionedTable(ts, `tb) } def accuracy(v1, v2) { return (v1 == v2).sum() \ v2.size() } def evaluateUnparitioned(filePath, numTrees, maxDepth, numJobs) { test = loadText("t1000.csv") t = loadText(filePath); clsNum = 2; colSize = 50 timer res = randomForestClassifier(sqlDS(<select * from t>), `cls, `col + string(0..(colSize-1)), clsNum, sqrt(colSize).int(), numTrees, 32, maxDepth, 0.0, numJobs) print("Unpartitioned table accuracy = " + accuracy(res.predict(test), test.cls).string()) } evaluateUnpartitioned("t10k.csv", 500, 10, 48) // choose your own parameter
5. xgboost的訓(xùn)練和預(yù)測(cè)腳本
import pandas as pd import numpy as np import xgboost as xgb from time import * def load_csv(path): df = pd.read_csv(path) target = df['cls'] df = df.drop(['cls'], axis=1) return xgb.DMatrix(df.values, label=target.values) dtest = load_csv('/hdd/hdd1/twonormData/t1000.csv') def evaluate(path, num_trees, max_depth, num_jobs): dtrain = load_csv(path) param = {'num_parallel_tree':num_trees, 'max_depth':max_depth, 'objective':'binary:logistic', 'nthread':num_jobs, 'colsample_bylevel':1/np.sqrt(50)} start = time() model = xgb.train(param, dtrain, 1) end = time() elapsed = end - start print("Time to train model: %.9f seconds" % elapsed) prediction = model.predict(dtest) > 0.5 print("Accuracy = %.3f" % np.mean(prediction == dtest.get_label())) evaluate('t10k.csv', 500, 10, 24) // choose your own parameter
看完上述內(nèi)容,你們掌握如何用隨機(jī)森林算法實(shí)現(xiàn)scikit-learn、Spark MLlib、DolphinDB、xgboost的性能對(duì)比測(cè)試的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。