Асинхронная контрольная точка состояния для запросов с отслеживанием состояния
Примечание.
Доступно в Databricks Runtime 10.4 LTS и более поздних версиях.
Асинхронная контрольная точка состояния поддерживает точно один раз гарантии для потоковых запросов, но может снизить общую задержку для некоторых рабочих нагрузок с отслеживанием состояния структурированной потоковой передачи, узких мест в обновлениях состояния. Это достигается путем начала обработки следующего микропакета сразу после завершения вычисления предыдущего микропакета, не ожидая завершения контрольной точки состояния. В следующей таблице сравниваются компромиссы по синхронному и асинхронному контрольным точкам.
Characteristic | Синхронная контрольная точка | асинхронные контрольные точки. |
---|---|---|
Задержка | Более высокая задержка для каждого микропакета. | Снижение задержки, так как микропакеты могут перекрываться. |
Перезагрузить | Быстрое восстановление, так как требуется повторно выполнить только последний пакет. | Более высокая задержка перезапуска, чем в микропакете, может потребоваться повторно выполнить. |
Ниже приведены характеристики задания потоковой передачи, которые могут воспользоваться асинхронными контрольными точками состояния:
- Задание имеет одну или несколько операций с отслеживанием состояния (например, агрегирование,
flatMapGroupsWithState
,mapGroupsWithState
, соединения "поток — поток"). - Задержка контрольной точки состояния является одним из основных факторов, определяющих общую задержку при выполнении пакета. Эти сведения можно найти в событиях StreamingQueryProgress. Эти события также находятся в журналах log4j в драйвере Spark. Ниже приведен пример хода выполнения запросов потоковой передачи и способы определения влияния контрольных точек состояния на общую задержку выполнения пакета.
-
{ "id" : "2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19", "runId" : "e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe", "...", "batchId" : 0, "durationMs" : { "...", "triggerExecution" : 547730, "..." }, "stateOperators" : [ { "...", "commitTimeMs" : 3186626, "numShufflePartitions" : 64, "..." }] }
Анализ задержки контрольной точки состояния приведенного выше события выполнения запроса
- Длительность пакета (
durationMs.triggerDuration
) составляет около 547 сек. - Задержка фиксации хранилища состояний (
stateOperations[0].commitTimeMs
) составляет около 3186 сек. Задержка фиксации агрегирована между задачами, содержащими хранилище состояний. В данном случае имеется 64 таких задачи (stateOperators[0].numShufflePartitions
). - На каждую задачу, содержащую оператор состояния, потребовалось в среднем 50 секунд (3186/64) для контрольной точки. Это дополнительная задержка, которая повлияла на длительность пакета. При условии, что все 64 задачи выполняются параллельно, шаг с контрольной точкой составляет 9 % (50 сек/547 сек) длительности пакета. Если максимальное число одновременных задач меньше 64, процентное значение становится пропорционально больше.
- Длительность пакета (
-
Включение асинхронного отслеживания контрольных точек
Для асинхронной контрольной точки состояния необходимо использовать хранилище состояний на основе RocksDB. Задайте следующие конфигурации:
spark.conf.set(
"spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
"true"
)
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)
Ограничения и требования для асинхронного создания контрольных точек
Примечание.
Автоматическое масштабирование вычислений имеет ограничения, ограничивающие размер кластера для структурированных рабочих нагрузок потоковой передачи. Databricks рекомендует использовать разностные динамические таблицы (Delta Live Tables) с расширенным автомасштабированием для потоковых рабочих нагрузок. См. статью "Оптимизация использования кластеров конвейеров Delta Live Tables с расширенным автомасштабированием".
- Любой сбой в асинхронном отслеживании контрольных точек в одном или нескольких хранилищах приведет к сбою всего запроса. В режиме синхронного отслеживания контрольных точек контрольная точка выполняется как часть задачи, а Spark повторяет выполнение задачи несколько раз, прежде чем отдать запрос. Этот механизм отсутствует в асинхронном отслеживании контрольных точек состояния. Однако при использовании повторных попыток заданияDatabricks после таких сбоев может быть автоматически предпринята повторная попытка.
- Асинхронное создание контрольных точек оптимально в тех случаях, когда расположения хранилищ состояния не изменяются в промежуток между выполнением микропакетов. Изменение размера кластера в сочетании с асинхронной контрольной точкой состояния может не работать, так как экземпляр хранилища состояний может повторно распространяться по мере добавления или удаления узлов в рамках события изменения размера кластера.
- Асинхронное отслеживание контрольных точек состояния поддерживается только в реализации поставщика хранилища состояний RocksDB. Используемая по умолчанию реализация хранилища состояний в памяти не поддерживает ее.