Поделиться через


Асинхронная контрольная точка состояния для запросов с отслеживанием состояния

Примечание.

Доступно в Databricks Runtime 10.4 LTS и более поздних версиях.

Асинхронная контрольная точка состояния поддерживает точно один раз гарантии для потоковых запросов, но может снизить общую задержку для некоторых рабочих нагрузок с отслеживанием состояния структурированной потоковой передачи, узких мест в обновлениях состояния. Это достигается путем начала обработки следующего микропакета сразу после завершения вычисления предыдущего микропакета, не ожидая завершения контрольной точки состояния. В следующей таблице сравниваются компромиссы по синхронному и асинхронному контрольным точкам.

Characteristic Синхронная контрольная точка асинхронные контрольные точки.
Задержка Более высокая задержка для каждого микропакета. Снижение задержки, так как микропакеты могут перекрываться.
Перезагрузить Быстрое восстановление, так как требуется повторно выполнить только последний пакет. Более высокая задержка перезапуска, чем в микропакете, может потребоваться повторно выполнить.

Ниже приведены характеристики задания потоковой передачи, которые могут воспользоваться асинхронными контрольными точками состояния:

  • Задание имеет одну или несколько операций с отслеживанием состояния (например, агрегирование, flatMapGroupsWithState, mapGroupsWithState, соединения "поток — поток").
  • Задержка контрольной точки состояния является одним из основных факторов, определяющих общую задержку при выполнении пакета. Эти сведения можно найти в событиях StreamingQueryProgress. Эти события также находятся в журналах log4j в драйвере Spark. Ниже приведен пример хода выполнения запросов потоковой передачи и способы определения влияния контрольных точек состояния на общую задержку выполнения пакета.
    • {
         "id" : "2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19",
         "runId" : "e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe",
         "...",
         "batchId" : 0,
         "durationMs" : {
           "...",
           "triggerExecution" : 547730,
           "..."
         },
         "stateOperators" : [ {
           "...",
           "commitTimeMs" : 3186626,
           "numShufflePartitions" : 64,
           "..."
         }]
      }
      
    • Анализ задержки контрольной точки состояния приведенного выше события выполнения запроса

      • Длительность пакета (durationMs.triggerDuration) составляет около 547 сек.
      • Задержка фиксации хранилища состояний (stateOperations[0].commitTimeMs) составляет около 3186 сек. Задержка фиксации агрегирована между задачами, содержащими хранилище состояний. В данном случае имеется 64 таких задачи (stateOperators[0].numShufflePartitions).
      • На каждую задачу, содержащую оператор состояния, потребовалось в среднем 50 секунд (3186/64) для контрольной точки. Это дополнительная задержка, которая повлияла на длительность пакета. При условии, что все 64 задачи выполняются параллельно, шаг с контрольной точкой составляет 9 % (50 сек/547 сек) длительности пакета. Если максимальное число одновременных задач меньше 64, процентное значение становится пропорционально больше.

Включение асинхронного отслеживания контрольных точек

Для асинхронной контрольной точки состояния необходимо использовать хранилище состояний на основе RocksDB. Задайте следующие конфигурации:


spark.conf.set(
  "spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
  "true"
)

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)

Ограничения и требования для асинхронного создания контрольных точек

Примечание.

Автоматическое масштабирование вычислений имеет ограничения, ограничивающие размер кластера для структурированных рабочих нагрузок потоковой передачи. Databricks рекомендует использовать разностные динамические таблицы (Delta Live Tables) с расширенным автомасштабированием для потоковых рабочих нагрузок. См. статью "Оптимизация использования кластеров конвейеров Delta Live Tables с расширенным автомасштабированием".

  • Любой сбой в асинхронном отслеживании контрольных точек в одном или нескольких хранилищах приведет к сбою всего запроса. В режиме синхронного отслеживания контрольных точек контрольная точка выполняется как часть задачи, а Spark повторяет выполнение задачи несколько раз, прежде чем отдать запрос. Этот механизм отсутствует в асинхронном отслеживании контрольных точек состояния. Однако при использовании повторных попыток заданияDatabricks после таких сбоев может быть автоматически предпринята повторная попытка.
  • Асинхронное создание контрольных точек оптимально в тех случаях, когда расположения хранилищ состояния не изменяются в промежуток между выполнением микропакетов. Изменение размера кластера в сочетании с асинхронной контрольной точкой состояния может не работать, так как экземпляр хранилища состояний может повторно распространяться по мере добавления или удаления узлов в рамках события изменения размера кластера.
  • Асинхронное отслеживание контрольных точек состояния поддерживается только в реализации поставщика хранилища состояний RocksDB. Используемая по умолчанию реализация хранилища состояний в памяти не поддерживает ее.