溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spark怎么寫HBASE

發(fā)布時間:2021-12-09 10:26:08 來源:億速云 閱讀:137 作者:小新 欄目:大數(shù)據(jù)

這篇文章將為大家詳細(xì)講解有關(guān)Spark怎么寫HBASE,小編覺得挺實(shí)用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

package com.iesol.high_frequency
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import scala.util.control._;
import java.nio.file.Path;
import java.nio.file.Paths;
import com.isesol.mapreduce.binFileRead_forscala
import java.util.List;
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.spark._
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.filter._
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.client.HTable
import scala.util.Random


object parseFile {


  def main(args: Array[String]) {


    /*
     * fileName 文件名稱
     * 
     * appId App請求發(fā)出者
     * 
     * bizId 業(yè)務(wù)唯一好嗎
     * 
     */
        val fileName = args(0)
        val appId = args(1)
        val machine_tool = args(2)
        val bizId = args(3)
        
        //colId 表示目前只有一種高頻采集,通過colID找到對應(yīng)的表字段個數(shù)
        val colId = "1"


    val conf = new SparkConf()
    conf.setMaster("local").setAppName("high frequency collection " + appId)
    val sc = new SparkContext(conf)
    val hbaseCols = binFileRead_forscala.getHaseCols(colId)
    val total_colNums = hbaseCols.size()
    val getFile = binFileRead_forscala.binFileOut(fileName, total_colNums)
    val getData = new Array[String](getFile.size())
    for (num <- 0 to getFile.size() - 1) {
      getData(num) = getFile.get(num)
    }


    val hbaseCols_scala = new Array[String](hbaseCols.size())


    for (num <- 0 to hbaseCols.size() - 1) {
      hbaseCols_scala(num) = hbaseCols.get(num)
      println("hbase cols is " + hbaseCols_scala(num))
    }


    val bankRDD = sc.parallelize(getData).map { x => x.split(",") }


    try {
      bankRDD.foreachPartition { x =>
        var count = 0
        val hbaseconf = HBaseConfiguration.create()
        hbaseconf.set("hbase.zookeeper.quorum", "datanode01.isesol.com,datanode02.isesol.com,datanode03.isesol.com,datanode04.isesol.com,cmserver.isesol.com")
        hbaseconf.set("hbase.zookeeper.property.clientPort", "2181")
        hbaseconf.set("maxSessionTimeout", "6")
        val myTable = new HTable(hbaseconf, TableName.valueOf("t_high_frequently"))
        // myTable.setAutoFlush(true)
        myTable.setWriteBufferSize(3 * 1024 * 1024)
        x.foreach { y =>
          {


            var rowkey = System.currentTimeMillis().toString()
            val p = new Put(Bytes.toBytes(machine_tool + "-" +  appId + "-" + bizId + "-" + rowkey))


            for (i <- 0 to hbaseCols_scala.size - 1) {
              p.add(Bytes.toBytes("cf"), Bytes.toBytes(hbaseCols_scala(i)), Bytes.toBytes(y(i)))
            }


            /* p.add(Bytes.toBytes("cf"), Bytes.toBytes("POSONSCREEN XC"), Bytes.toBytes(y(0)))
            p.add(Bytes.toBytes("cf"), Bytes.toBytes("AXFEEDBACKVEL X"), Bytes.toBytes(y(1)))
            p.add(Bytes.toBytes("cf"), Bytes.toBytes("AXFEEDBACKPOS X"), Bytes.toBytes(y(2))) */
            
            myTable.put(p)


          }


        }
        myTable.flushCommits()
        myTable.close()
      }
    } catch {
      case ex: Exception => println("can not connect hbase")
    }
  }
}

關(guān)于“Spark怎么寫HBASE”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI