Поделиться через


Оптимизация запросов структурированной потоковой передачи с отслеживанием состояния

Управление сведениями о промежуточном состоянии для запросов структурированной потоковой передачи с отслеживанием состояния позволяет предотвратить непредвиденные задержки и проблемы в рабочей среде.

Databricks рекомендует следующее:

  • Используйте оптимизированные для вычислений экземпляры в качестве рабочих ролей.
  • Задайте количество секций перемешивания в 1–2 раза больше количества ядер в кластере.
  • Задайте для конфигурации spark.sql.streaming.noDataMicroBatches.enabled значение false в SparkSession. Это не допускает обработку микропакетов, которые не содержат данных, в обработчике микропакетов. Обратите внимание, что установка для этой конфигурации значения false может привести к тому, что операции с отслеживанием состояния, использующие пределы или время ожидания обработки, не будут получать выходные данные до тех пор, пока новые данные не будут получены, а не сразу.

Databricks рекомендует использовать RocksDB с журналом изменений проверка, чтобы управлять состоянием потоков с отслеживанием состояния. См. статью Настройка хранилища состояний RocksDB в Azure Databricks.

Примечание.

Схему управления состоянием нельзя изменить между перезапусками запросов. То есть если запрос был запущен с управлением по умолчанию, то он не может быть изменен без запуска запроса с нуля с новым расположением контрольной точки.

Работа с несколькими операторами с отслеживанием состояния в структурированной потоковой передаче

В Databricks Runtime 13.3 LTS и более поздних версиях Azure Databricks предлагает расширенную поддержку операторов с отслеживанием состояния в структурированных рабочих нагрузках потоковой передачи. Теперь можно объединить несколько операторов с отслеживанием состояния, что означает, что вы можете передать выходные данные операции, например агрегирование окна в другую операцию с отслеживанием состояния, например соединение.

В следующих примерах показано несколько шаблонов, которые можно использовать.

Внимание

При работе с несколькими операторами с отслеживанием состояния существуют следующие ограничения:

  • Функция FlatMapGroupWithState не поддерживается.
  • Поддерживается только режим вывода добавления.

Агрегирование интервала времени в цепочке

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Агрегирование периода времени в двух разных потоках, за которым следует соединение окна потоковой передачи

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Соединение интервала времени потока, за которым следует агрегирование периода времени

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Перебалансирование состояния для структурированной потоковой передачи

Перебалансирование состояния включается по умолчанию для всех рабочих нагрузок потоковой передачи в разностных динамических таблицах. В Databricks Runtime 11.3 LTS и более поздних версиях можно задать следующий параметр конфигурации в конфигурации кластера Spark, чтобы включить перебалансирование состояния:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Перебалансирование состояния дает преимущества конвейеров структурированной потоковой передачи с отслеживанием состояния, которые проходят события изменения размера кластера. Операции потоковой передачи без отслеживания состояния не получают преимущества независимо от изменения размеров кластера.

Примечание.

Автоматическое масштабирование вычислений имеет ограничения, ограничивающие размер кластера для структурированных рабочих нагрузок потоковой передачи. Databricks рекомендует использовать разностные динамические таблицы (Delta Live Tables) с расширенным автомасштабированием для потоковых рабочих нагрузок. См. статью "Оптимизация использования кластеров конвейеров Delta Live Tables с расширенным автомасштабированием".

События изменения размера кластера вызывают перебалансировку состояния. При перебалансировке событий микропакеты могут иметь более высокую задержку, так как состояние загружается из облачного хранилища в новые исполнители.

Укажите начальное состояние для mapGroupsWithState

Вы можете указать определяемое пользователем начальное состояние для обработки структурированной потоковой передачи с отслеживанием состояния с помощью flatMapGroupsWithState или mapGroupsWithState. Это позволяет избежать повторной обработки данных при запуске потока с отслеживанием состояния без допустимой контрольной точки.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Пример варианта использования, который задает начальное состояние для оператора flatMapGroupsWithState:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Пример варианта использования, который задает начальное состояние для оператора mapGroupsWithState:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

mapGroupsWithState Проверка функции обновления

API TestGroupState позволяет протестировать функцию обновления состояния, используемую для Dataset.groupByKey(...).mapGroupsWithState(...) и Dataset.groupByKey(...).flatMapGroupsWithState(...).

Функция обновления состояния принимает предыдущее состояние в качестве входных данных с помощью объекта типа GroupState. См. справочную документацию по GroupState Apache Spark. Например:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}