Поделиться через


Примеры кода для Databricks Connect для Python

Примечание.

В этой статье рассматриваются Databricks Connect для Databricks Runtime 13.3 LTS и более поздних версий.

В этой статье приведены примеры кода, использующие Databricks Connect для Python. Databricks Connect позволяет подключать популярные идентификаторы, серверы записных книжек и пользовательские приложения к кластерам Azure Databricks. См. раздел "Что такое Databricks Connect?". Сведения о версии Scala этой статьи см . в примерах кода для Databricks Connect для Scala.

Примечание.

Прежде чем начать использовать Databricks Connect, необходимо настроить клиент Databricks Connect.

Databricks предоставляет несколько дополнительных примеров приложений, демонстрирующих использование Databricks Connect. Ознакомьтесь с примерами приложений для репозитория Databricks Connect в GitHub, в частности:

Вы также можете использовать следующие простые примеры кода для экспериментов с Databricks Connect. В этих примерах предполагается, что для настройки клиента Databricks Connect используется проверка подлинности по умолчанию.

Этот простой пример кода запрашивает указанную таблицу, а затем отображает первые 5 строк указанной таблицы. Чтобы использовать другую таблицу, настройте вызов spark.read.table.

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

В этом более длинном примере кода выполняется следующее:

  1. Создает кадр данных в памяти.
  2. Создает таблицу с именем zzz_demo_temps_table в схеме default . Если таблица с этим именем уже существует, сначала удаляется таблица. Чтобы использовать другую схему или таблицу, настройте вызовы spark.sqlили temps.write.saveAsTableоба.
  3. Сохраняет содержимое кадра данных в таблицу.
  4. SELECT Выполняет запрос к содержимому таблицы.
  5. Отображает результат запроса.
  6. Удаляет таблицу.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date

spark = DatabricksSession.builder.getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
  StructField('AirportCode', StringType(), False),
  StructField('Date', DateType(), False),
  StructField('TempHighF', IntegerType(), False),
  StructField('TempLowF', IntegerType(), False)
])

data = [
  [ 'BLI', date(2021, 4, 3), 52, 43],
  [ 'BLI', date(2021, 4, 2), 50, 38],
  [ 'BLI', date(2021, 4, 1), 52, 41],
  [ 'PDX', date(2021, 4, 3), 64, 45],
  [ 'PDX', date(2021, 4, 2), 61, 41],
  [ 'PDX', date(2021, 4, 1), 66, 39],
  [ 'SEA', date(2021, 4, 3), 57, 43],
  [ 'SEA', date(2021, 4, 2), 54, 39],
  [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
  "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
  "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
  "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')

Примечание.

В следующем примере описывается, как писать код, переносимый между Databricks Connect для Databricks Runtime 13.3 LTS и выше в средах, где DatabricksSession класс недоступен.

В следующем примере используется DatabricksSession класс или SparkSession используется класс, если DatabricksSession класс недоступен, для запроса указанной таблицы и возврата первых 5 строк. В этом примере используется SPARK_REMOTE переменная среды для проверки подлинности.

from pyspark.sql import SparkSession, DataFrame

def get_spark() -> SparkSession:
  try:
    from databricks.connect import DatabricksSession
    return DatabricksSession.builder.getOrCreate()
  except ImportError:
    return SparkSession.builder.getOrCreate()

def get_taxis(spark: SparkSession) -> DataFrame:
  return spark.read.table("samples.nyctaxi.trips")

get_taxis(get_spark()).show(5)