Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.spark.util.VersionUtils

class SparkConnectDatabaseMetaData(conn: SparkConnectConnection) extends DatabaseMetaData {

import conn.spark.implicits._

override def allProceduresAreCallable: Boolean = false

override def allTablesAreSelectable: Boolean = false
Expand Down Expand Up @@ -288,8 +290,14 @@ class SparkConnectDatabaseMetaData(conn: SparkConnectConnection) extends Databas
columnNamePattern: String): ResultSet =
throw new SQLFeatureNotSupportedException

override def getCatalogs: ResultSet =
throw new SQLFeatureNotSupportedException
override def getCatalogs: ResultSet = {
conn.checkOpen()

val df = conn.spark.sql("SHOW CATALOGS")
.select($"catalog".as("TABLE_CAT"))
.orderBy("TABLE_CAT")
new SparkConnectResultSet(df.collectResult())
}

override def getSchemas: ResultSet =
throw new SQLFeatureNotSupportedException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,27 @@ package org.apache.spark.sql.connect.client.jdbc

import java.sql.{Array => _, _}

import scala.util.Using

import org.apache.spark.SparkBuildInfo.{spark_version => SPARK_VERSION}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.client.jdbc.test.JdbcHelper
import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession}
import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession, SQLHelper}
import org.apache.spark.util.VersionUtils

class SparkConnectDatabaseMetaDataSuite extends ConnectFunSuite with RemoteSparkSession
with JdbcHelper {
with JdbcHelper with SQLHelper {

def jdbcUrl: String = s"jdbc:sc://localhost:$serverPort"

// catalyst test jar is inaccessible here, but presents at the testing connect server classpath
private val TEST_IN_MEMORY_CATALOG = "org.apache.spark.sql.connector.catalog.InMemoryCatalog"

private def registerCatalog(
name: String, className: String)(implicit spark: SparkSession): Unit = {
spark.conf.set(s"spark.sql.catalog.$name", className)
}

test("SparkConnectDatabaseMetaData simple methods") {
withConnection { conn =>
val spark = conn.asInstanceOf[SparkConnectConnection].spark
Expand Down Expand Up @@ -199,4 +210,29 @@ class SparkConnectDatabaseMetaDataSuite extends ConnectFunSuite with RemoteSpark
// scalastyle:on line.size.limit
}
}

test("SparkConnectDatabaseMetaData getCatalogs") {
withConnection { conn =>
implicit val spark: SparkSession = conn.asInstanceOf[SparkConnectConnection].spark

registerCatalog("testcat", TEST_IN_MEMORY_CATALOG)
registerCatalog("testcat2", TEST_IN_MEMORY_CATALOG)

// forcibly initialize the registered catalogs because SHOW CATALOGS only
// returns the initialized catalogs.
spark.sql("USE testcat")
spark.sql("USE testcat2")
spark.sql("USE spark_catalog")

val metadata = conn.getMetaData
Using.resource(metadata.getCatalogs) { rs =>
val catalogs = new Iterator[String] {
def hasNext: Boolean = rs.next()
def next(): String = rs.getString("TABLE_CAT")
}.toSeq
// results are ordered by TABLE_CAT
assert(catalogs === Seq("spark_catalog", "testcat", "testcat2"))
}
}
}
}