溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spark 3.0.1集成delta 0.7.0之delta如何進行DDL操作

發(fā)布時間:2021-12-16 16:15:24 來源:億速云 閱讀:167 作者:小新 欄目:大數(shù)據(jù)

小編給大家分享一下spark 3.0.1集成delta 0.7.0之delta如何進行DDL操作,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

分析

delta在0.7.0以前是不能夠進行save表操作的,只能存儲到文件中,也就是說他的元數(shù)據(jù)是和spark的其他元數(shù)據(jù)是分開的,delta是獨立存在的,也是不能和其他表進行關(guān)聯(lián)操作的,只有到了delta 0.7.0版本以后,才真正意義上和spark進行了集成,這也得益于spark 3.x的Catalog plugin API 特性。
還是先從delta的configurate sparksession入手,如下:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("...")
  .master("...")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  .getOrCreate()

對于第二個配置 config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 從spark configuration,我們可以看到對該spark.sql.catalog.spark_catalog的解釋是

A catalog implementation that will be used as the v2 interface to Spark's built-in v1 catalog: spark_catalog. This catalog shares its identifier namespace with the spark_catalog and must be consistent with it; for example, if a table can be loaded by the spark_catalog, this catalog must also return the table metadata. To delegate operations to the spark_catalog, implementations can extend 'CatalogExtension'.

也就是說,通過該配置可以實現(xiàn)元數(shù)據(jù)的統(tǒng)一性,其實這也是spark社區(qū)和delta社區(qū)進行交互的一種結(jié)果

spark 3.x的Catalog plugin API

為了能搞懂delta為什么能夠進行DDL和DML操作,就得先知道spark 3.x的Catalog plugin機制SPARK-31121.

首先是interface CatalogPlugin,該接口是catalog plugin的頂級接口,正如注釋所說:

 * A marker interface to provide a catalog implementation for Spark.
 * <p>
 * Implementations can provide catalog functions by implementing additional interfaces for tables,
 * views, and functions.
 * <p>
 * Catalog implementations must implement this marker interface to be loaded by
 * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the
 * required public no-arg constructor. After creating an instance, it will be configured by calling
 * {@link #initialize(String, CaseInsensitiveStringMap)}.
 * <p>
 * Catalog implementations are registered to a name by adding a configuration option to Spark:
 * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties
 * in the Spark configuration that share the catalog name prefix,
 * {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive
 * string map of options in initialization with the prefix removed.
 * {@code name}, is also passed and is the catalog's name; in this case, "catalog-name".

可以通過spark.sql.catalog.catalog-name=com.example.YourCatalogClass集成到spark中
該類的實現(xiàn)還可以集成其他額外的tables views functions的接口,這里就得提到接口TableCatalog,該類提供了與tables相關(guān)的方法:

/**
   * List the tables in a namespace from the catalog.
   * <p>
   * If the catalog supports views, this must return identifiers for only tables and not views.
   *
   * @param namespace a multi-part namespace
   * @return an array of Identifiers for tables
   * @throws NoSuchNamespaceException If the namespace does not exist (optional).
   */
  Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException;

  /**
   * Load table metadata by {@link Identifier identifier} from the catalog.
   * <p>
   * If the catalog supports views and contains a view for the identifier and not a table, this
   * must throw {@link NoSuchTableException}.
   *
   * @param ident a table identifier
   * @return the table's metadata
   * @throws NoSuchTableException If the table doesn't exist or is a view
   */
  Table loadTable(Identifier ident) throws NoSuchTableException;

這樣就可以基于TableCatalog開發(fā)自己的catalog,從而實現(xiàn)multi-catalog support

還得有個接口DelegatingCatalogExtension,這是個實現(xiàn)了CatalogExtension接口的抽象類,而CatalogExtension繼承了TableCatalog, SupportsNamespaces。DeltaCatalog實現(xiàn)了DelegatingCatalogExtension ,這部分后續(xù)進行分析。
最后還有一個class CatalogManager,這個類是用來管理CatalogPlugins的,且是線程安全的:

/**
 * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow
 * the caller to look up a catalog by name.
 *
 * There are still many commands (e.g. ANALYZE TABLE) that do not support v2 catalog API. They
 * ignore the current catalog and blindly go to the v1 `SessionCatalog`. To avoid tracking current
 * namespace in both `SessionCatalog` and `CatalogManger`, we let `CatalogManager` to set/get
 * current database of `SessionCatalog` when the current catalog is the session catalog.
 */
// TODO: all commands should look up table from the current catalog. The `SessionCatalog` doesn't
//       need to track current database at all.
private[sql]
class CatalogManager(
    conf: SQLConf,
    defaultSessionCatalog: CatalogPlugin,
    val v1SessionCatalog: SessionCatalog) extends Logging {

我們看到CatalogManager管理了v2版本的 CatalogPlugin和v1版本的sessionCatalog,這個是因為歷史的原因?qū)е卤仨毜眉嫒輛1版本

那CatalogManager在哪里被調(diào)用呢。 我們看一下BaseSessionStateBuilder ,可以看到該類中才是正宗使用CatalogManager的地方:

/**
   * Catalog for managing table and database states. If there is a pre-existing catalog, the state
   * of that catalog (temp tables & current database) will be copied into the new catalog.
   *
   * Note: this depends on the `conf`, `functionRegistry` and `sqlParser` fields.
   */
  protected lazy val catalog: SessionCatalog = {
    val catalog = new SessionCatalog(
      () => session.sharedState.externalCatalog,
      () => session.sharedState.globalTempViewManager,
      functionRegistry,
      conf,
      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
      sqlParser,
      resourceLoader)
    parentState.foreach(_.catalog.copyStateTo(catalog))
    catalog
  }

  protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog, conf)

  protected lazy val catalogManager = new CatalogManager(conf, v2SessionCatalog, catalog)

SessionCatalog 是v1版本的,主要是跟底層的元數(shù)據(jù)存儲通信,以及管理臨時視圖,udf的,這一部分暫時不分析,重點放到v2版本的sessionCatalog, 我們看一下V2SessionCatalog:

/**
 * A [[TableCatalog]] that translates calls to the v1 SessionCatalog.
 */
class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
  extends TableCatalog with SupportsNamespaces {
  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
  import V2SessionCatalog._

  override val defaultNamespace: Array[String] = Array("default")

  override def name: String = CatalogManager.SESSION_CATALOG_NAME

  // This class is instantiated by Spark, so `initialize` method will not be called.
  override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {}

  override def listTables(namespace: Array[String]): Array[Identifier] = {
    namespace match {
      case Array(db) =>
        catalog
          .listTables(db)
          .map(ident => Identifier.of(Array(ident.database.getOrElse("")), ident.table))
          .toArray
      case _ =>
        throw new NoSuchNamespaceException(namespace)
    }
  }

我們分析一下listTables方法可知,v2的sessionCatalog操作 都是委托給了v1版本的sessionCatalog去操作的,其他的方法也是一樣, 而且name默認為CatalogManager.SESSION_CATALOG_NAME,也就是spark_catalog,這里后面也會提到,注意一下。 而且,catalogmanager在邏輯計劃中的分析器和優(yōu)化器中也會用到,因為會用到其中的元數(shù)據(jù):

protected def analyzer: Analyzer = new Analyzer(catalogManager, conf) {
...
protected def optimizer: Optimizer = {
    new SparkOptimizer(catalogManager, catalog, experimentalMethods) {
      override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
        super.earlyScanPushDownRules ++ customEarlyScanPushDownRules

      override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
        super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules
    }
  }

而analyzer和optimizer正是spark sql進行解析的核心中的核心,當然還有物理計劃的生成。 那這些analyzer和optimizer是在哪里被調(diào)用呢?
我們舉一個例子,DataSet中的filter方法就調(diào)用了:

 */
  def filter(conditionExpr: String): Dataset[T] = {
    filter(Column(sparkSession.sessionState.sqlParser.parseExpression(conditionExpr)))
  }

sessionState.sqlParser就是剛才所說的sqlParser:

protected lazy val sqlParser: ParserInterface = {
    extensions.buildParser(session, new SparkSqlParser(conf))
  }

只有整個邏輯 從sql解析到使用元數(shù)據(jù)的數(shù)據(jù)鏈路,我們就能大致知道怎么一回事了。

delta的DeltaCatalog

我們回過頭來看看,delta的DeltaCatalog是怎么和spark 3.x進行結(jié)合的 ,上源碼DeltaCatalog:

class DeltaCatalog(val spark: SparkSession) extends DelegatingCatalogExtension
  with StagingTableCatalog
  with SupportsPathIdentifier {

  def this() = {
    this(SparkSession.active)
  }
  ...

就如之前所說的DeltaCatalog繼承了DelegatingCatalogExtension,從名字可以看出這是一個委托類,那到底是怎么委托的呢以及委托給誰呢?

public abstract class DelegatingCatalogExtension implements CatalogExtension {

  private CatalogPlugin delegate;

  public final void setDelegateCatalog(CatalogPlugin delegate) {
    this.delegate = delegate;
  }

該類中有個setDelegateCatalog方法,該方法在CatalogManager中的loadV2SessionCatalog方法中被調(diào)用:

private def loadV2SessionCatalog(): CatalogPlugin = {
    Catalogs.load(SESSION_CATALOG_NAME, conf) match {
      case extension: CatalogExtension =>
        extension.setDelegateCatalog(defaultSessionCatalog)
        extension
      case other => other
    }
  }

而該方法被v2SessionCatalog調(diào)用:

private[sql] def v2SessionCatalog: CatalogPlugin = {
    conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).map { customV2SessionCatalog =>
      try {
        catalogs.getOrElseUpdate(SESSION_CATALOG_NAME, loadV2SessionCatalog())
      } catch {
        case NonFatal(_) =>
          logError(
            "Fail to instantiate the custom v2 session catalog: " + customV2SessionCatalog)
          defaultSessionCatalog
      }
    }.getOrElse(defaultSessionCatalog)
  }

這個就是返回默認的v2版本的SessionCatalog實例,分析一下這個方法:

   首先得到配置項SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION,也就是spark.sql.catalog.spark_catalog配置,
   如果spark配置了的話,就調(diào)用loadV2SessionCatalog加載該類,,否則就加載默認的v2SessionCatalog,也就是V2SessionCatalog實例

這里我們就發(fā)現(xiàn)了:
delta配置的spark.sql.catalog.spark_catalog為"org.apache.spark.sql.delta.catalog.DeltaCatalog",也就是說,spark中的V2SessionCatalog是DeltaCatalog的實例,而DeltaCatalog的委托給了BaseSessionStateBuilder中的V2SessionCatalog實例。

具體看看DeltaCatalog 的createTable方法,其他的方法類似:

override def createTable(
      ident: Identifier,
      schema: StructType,
      partitions: Array[Transform],
      properties: util.Map[String, String]): Table = {
    if (DeltaSourceUtils.isDeltaDataSourceName(getProvider(properties))) {
      createDeltaTable(
        ident, schema, partitions, properties, sourceQuery = None, TableCreationModes.Create)
    } else {
      super.createTable(ident, schema, partitions, properties)
    }
  }
...
private def createDeltaTable(
      ident: Identifier,
      schema: StructType,
      partitions: Array[Transform],
      properties: util.Map[String, String],
      sourceQuery: Option[LogicalPlan],
      operation: TableCreationModes.CreationMode): Table = {
     ...
    val tableDesc = new CatalogTable(
      identifier = TableIdentifier(ident.name(), ident.namespace().lastOption),
      tableType = tableType,
      storage = storage,
      schema = schema,
      provider = Some("delta"),
      partitionColumnNames = partitionColumns,
      bucketSpec = maybeBucketSpec,
      properties = tableProperties.toMap,
      comment = Option(properties.get("comment")))
    // END: copy-paste from the super method finished.

    val withDb = verifyTableAndSolidify(tableDesc, None)
    ParquetSchemaConverter.checkFieldNames(tableDesc.schema.fieldNames)
    CreateDeltaTableCommand(
      withDb,
      getExistingTableIfExists(tableDesc),
      operation.mode,
      sourceQuery,
      operation,
      tableByPath = isByPath).run(spark)

    loadTable(ident)
      }
 override def loadTable(ident: Identifier): Table = {
    try {
      super.loadTable(ident) match {
        case v1: V1Table if DeltaTableUtils.isDeltaTable(v1.catalogTable) =>
          DeltaTableV2(
            spark,
            new Path(v1.catalogTable.location),
            catalogTable = Some(v1.catalogTable),
            tableIdentifier = Some(ident.toString))
        case o => o
      }
    
  }
  • 判斷是否是delta數(shù)據(jù)源,如果是的話,跳到createDeltaTable方法,否則直接調(diào)用super.createTable方法,

  • createDeltaTable先會進行delta特有的CreateDeltaTableCommand.run()命令寫入delta數(shù)據(jù),之后載loadTable

  • loadTable則會調(diào)用super的loadTable,而方法會調(diào)用V2SessionCatalog的loadTable,而V2SessionCatalog最終會調(diào)用v1版本sessionCatalog的getTableMetadata方法,從而組成V1Table(catalogTable)返回,這樣就把delta的元數(shù)據(jù)信息持久化到了v1 SessionCatalog管理的元數(shù)據(jù)庫中

  • 如果不是delta數(shù)據(jù)源,則調(diào)用super.createTable方法,該方法調(diào)用V2SessionCatalog的createTable,而最終還是調(diào)用v1版本sessionCatalog的createTable方法

我們這里重點分析了delta數(shù)據(jù)源到元數(shù)據(jù)的存儲,非delta數(shù)據(jù)源的代碼就沒有粘貼過來,有興趣的自己可以編譯源碼跟蹤一下

我們還得提一下spark.sql.defaultCatalog的默認配置為spark_catalog,也就是sql的默認catalog為spark_catalog,對應到delta的話就是DeltaCatalog。

以上是“spark 3.0.1集成delta 0.7.0之delta如何進行DDL操作”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學習更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI