Поделиться через


Мониторинг запросов структурированной потоковой передачи в Azure Databricks

Azure Databricks предоставляет встроенный мониторинг для структурированных приложений потоковой передачи с помощью пользовательского интерфейса Spark на вкладке "Потоковая передача ".

Различение запросов структурированной потоковой передачи в пользовательском интерфейсе Spark

Задайте для потоков уникальное имя запроса, добавив .queryName(<query-name>) в код writeStream, чтобы легко различать, какие метрики принадлежат определенному потоку в пользовательском интерфейсе Spark.

Отправка метрик структурированной потоковой передачи во внешние службы

Метрики потоковой передачи можно отправить во внешние службы для оповещения или использования панелей мониторинга с помощью интерфейса прослушивателя потоковых запросов Apache Spark. В Databricks Runtime 11.3 LTS и более поздних версиях прослушиватель потоковых запросов доступен в Python и Scala.

Внимание

Учетные данные и объекты, управляемые каталогом Unity, нельзя использовать в StreamingQueryListener логике.

Примечание.

Задержка обработки с прослушивателями может значительно повлиять на скорость обработки запросов. Рекомендуется ограничить логику обработки в этих прослушивателях и выбрать запись в системы быстрого реагирования, такие как Kafka для повышения эффективности.

В следующем коде приведены основные примеры синтаксиса для реализации прослушивателя:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Определение наблюдаемых метрик в структурированной потоковой передаче

Метрики наблюдения именуются произвольными агрегатными функциями, которые можно определить в запросе (DataFrame). Как только выполнение DataFrame завершается (то есть завершает пакетный запрос или начинает потоковую передачу), запускается именованное событие, которое содержит метрики для данных, обработанных с момента последнего завершения.

Можно включить наблюдение за этими метриками, подключив прослушиватель к сеансу Spark. Прослушиватель зависит от режима выполнения:

  • Пакетный режим: используйте QueryExecutionListener.

    QueryExecutionListener вызывается при завершении запроса. Доступ к метрикам осуществляется с помощью сопоставления QueryExecution.observedMetrics.

  • Потоковая передача или микробатч: используйте StreamingQueryListener.

    StreamingQueryListener вызывается при завершении эпохи потоковым запросом. Доступ к метрикам осуществляется с помощью сопоставления StreamingQueryProgress.observedMetrics. В Azure Databricks не поддерживается потоковая передача непрерывного выполнения.

Например:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Метрики объектов StreamingQueryListener

Метрическая Description
id Уникальный идентификатор запроса, который сохраняется во время перезапуска. См . StreamingQuery.id().
runId Идентификатор запроса, уникальный для каждого запуска или перезапуска. См. раздел StreamingQuery.runId().
name Указанное пользователем имя запроса. Имя равно NULL, если имя не указано.
timestamp Метка времени для выполнения микробатча.
batchId Уникальный идентификатор для текущего пакета обрабатываемых данных. В случае повторных попыток после сбоя указанный идентификатор пакетной службы может выполняться несколько раз. Аналогичным образом, если нет данных для обработки, идентификатор пакетной службы не увеличивается.
numInputRows Совокупное (во всех источниках) количество записей, обрабатываемых в триггере.
inputRowsPerSecond Совокупная скорость (во всех источниках) поступающих данных.
processedRowsPerSecond Совокупная скорость (во всех источниках), с которой Spark обрабатывает данные.

Объект durationMs

Сведения о времени, необходимого для выполнения различных этапов процесса выполнения микробатча.

Метрическая Description
durationMs.addBatch Время выполнения микробатча. Это исключает время, необходимое Spark для планирования микробатча.
durationMs.getBatch Время, необходимое для получения метаданных о смещениях из источника.
durationMs.latestOffset Последнее смещение, используемое для микробатча. Этот объект хода выполнения относится к времени, затраченного на получение последнего смещения из источников.
durationMs.queryPlanning Время, затраченное на создание плана выполнения.
durationMs.triggerExecution Время, необходимое для планирования и выполнения микробатча.
durationMs.walCommit Время фиксации новых доступных смещения.

объект eventTime

Сведения о значении времени события, которое видно в данных, обрабатываемых в микробатче. Эти данные используются подложкой, чтобы выяснить, как обрезать состояние для обработки агрегатов с отслеживанием состояния, определенных в задании структурированной потоковой передачи.

Метрическая Description
eventTime.avg Среднее время события, наблюдаемое в этом триггере.
eventTime.max Максимальное время события, наблюдаемое в этом триггере.
eventTime.min Минимальное время события, наблюдаемое в этом триггере.
eventTime.watermark Значение водяного знака, используемого в этом триггере.

Объект stateOperator

Сведения о операциях с отслеживанием состояния, определенных в задании структурированной потоковой передачи, и агрегатах, созданных из них.

Метрическая Description
stateOperators.operatorName Имя оператора с отслеживанием состояния, к которому относятся метрики, например symmetricHashJoin, dedupestateStoreSave.
stateOperators.numRowsTotal Общее количество строк в состоянии в результате оператора с отслеживанием состояния или агрегирования.
stateOperators.numRowsUpdated Общее количество строк, обновленных в состоянии в результате оператора с отслеживанием состояния или агрегирования.
stateOperators.allUpdatesTimeMs Эта метрика в настоящее время не измерима с помощью Spark и планируется удалить в будущих обновлениях.
stateOperators.numRowsRemoved Общее количество строк, удаленных из состояния в результате оператора с отслеживанием состояния или агрегирования.
stateOperators.allRemovalsTimeMs Эта метрика в настоящее время не измерима с помощью Spark и планируется удалить в будущих обновлениях.
stateOperators.commitTimeMs Время фиксации всех обновлений (помещает и удаляет) и возвращает новую версию.
stateOperators.memoryUsedBytes Память, используемая хранилищем состояний.
stateOperators.numRowsDroppedByWatermark Количество строк, которые считаются слишком поздно, которые должны быть включены в агрегирование с отслеживанием состояния. Только агрегаты потоковой передачи: количество строк, удаленных после агрегирования (не необработанных входных строк). Это число не является точным, но дает указание на то, что удалены поздние данные.
stateOperators.numShufflePartitions Количество секций перетасовки для этого оператора с отслеживанием состояния.
stateOperators.numStateStoreInstances Фактический экземпляр хранилища состояний, который оператор инициализировал и поддерживал. Для многих операторов с отслеживанием состояния это то же самое, что и количество секций. Однако поток-поток инициализирует четыре экземпляра хранилища состояний на секцию.

объект stateOperator.customMetrics

Сведения, собранные из RocksDB, записывая метрики о производительности и операциях с учетом значений с отслеживанием состояния, которые он поддерживает для задания структурированной потоковой передачи. Дополнительные сведения см. в статье Настройка хранилища состояний RocksDB в Azure Databricks.

Метрическая Description
customMetrics.rocksdbBytesCopied Количество байтов, скопированных в виде отслеживаемого диспетчером файлов RocksDB.
customMetrics.rocksdbCommitCheckpointLatency Время в миллисекундах, задав моментальный снимок собственного RocksDB и напишите его в локальный каталог.
customMetrics.rocksdbCompactLatency Время сжатия (необязательно) в миллисекундах во время фиксации контрольной точки.
customMetrics.rocksdbCommitFileSyncLatencyMs Время в миллисекундах синхронизации собственного моментального снимка RocksDB с внешним хранилищем (расположение контрольной точки).
customMetrics.rocksdbCommitFlushLatency Время в миллисекундах сброса изменений в памяти RocksDB на локальный диск.
customMetrics.rocksdbCommitPauseLatency Время в миллисекундах останавливает фоновые рабочие потоки в рамках фиксации контрольной точки, например для сжатия.
customMetrics.rocksdbCommitWriteBatchLatency Время в миллисекундах при применении поэтапной записи в структуре памяти (WriteBatch) к native RocksDB.
customMetrics.rocksdbFilesCopied Количество файлов, скопированных в виде отслеживаемого диспетчером файлов RocksDB.
customMetrics.rocksdbFilesReused Количество файлов, повторно используемых диспетчером файлов RocksDB.
customMetrics.rocksdbGetCount Количество вызовов к базе данных (не включается gets из WriteBatch пакета в памяти, используемого get для промежуточной записи).
customMetrics.rocksdbGetLatency Среднее время в наносекундах для базового собственного RocksDB::Get вызова.
customMetrics.rocksdbReadBlockCacheHitCount Количество попаданий кэша из кэша блоков в RocksDB, полезное для предотвращения операций чтения локального диска.
customMetrics.rocksdbReadBlockCacheMissCount Количество кэша блоков в RocksDB не полезно для предотвращения чтения локального диска.
customMetrics.rocksdbSstFileSize Размер всех файлов статической отсортированных таблиц (SST) — табличной структуры RocksDB используется для хранения данных.
customMetrics.rocksdbTotalBytesRead Количество несжатых байтов, считываемых операциями get .
customMetrics.rocksdbTotalBytesReadByCompaction Количество байтов, считываемых процессом сжатия с диска.
customMetrics.rocksdbTotalBytesReadThroughIterator Общее количество байтов несжатых данных, считываемых с помощью итератора. Для некоторых операций с отслеживанием состояния (например, обработка времени ожидания и FlatMapGroupsWithState подложки) требуется чтение данных в базе данных с помощью итератора.
customMetrics.rocksdbTotalBytesWritten Общее количество несжатых байтов, put записанных операциями.
customMetrics.rocksdbTotalBytesWrittenByCompaction Общее количество байтов процесса сжатия записывается на диск.
customMetrics.rocksdbTotalCompactionLatencyMs Время в миллисекундах для уплотнений RocksDB, включая фоновые сжатия и необязательное сжатие, инициированное во время фиксации.
customMetrics.rocksdbTotalFlushLatencyMs Общее время очистки, включая фоновую очистку. Операции очистки — это процессы, с помощью которых MemTable выполняется очистка хранилища после его полного завершения. MemTables — это первый уровень хранения данных в RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed Размер в байтах несжатых ZIP-файлов, как сообщает диспетчер файлов. Диспетчер файлов управляет использованием и удалением физического места на диске SST.

объект sources (Kafka)

Метрическая Description
sources.description Подробное описание источника Kafka, указывающее точный раздел Kafka, считываемый из. Например: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
Объект sources.startOffset. Начальный номер смещения в разделе Kafka, в котором запущено задание потоковой передачи.
Объект sources.endOffset. Последнее смещение, обработанное микробатчом. Это может быть равно текущему выполнению latestOffset микробатч.
Объект sources.latestOffset. Последнее смещение, рисуемое микробатчом. Процесс микробатчинга может не обрабатывать все смещения при наличии регулирования, что приводит к endOffset разности.latestOffset
sources.numInputRows Количество входных строк, обработанных из этого источника.
sources.inputRowsPerSecond Скорость, с которой данные приходят для обработки из этого источника.
sources.processedRowsPerSecond Скорость обработки данных spark из этого источника.

объект sources.metrics (Kafka)

Метрическая Description
sources.metrics.avgOffsetsBehindLatest Среднее количество смещения, которое потоковый запрос находится за последним доступным смещением среди всех подписанных разделов.
sources.metrics.estimatedTotalBytesBehindLatest Предполагаемое количество байтов, которые процесс запроса не использовал из подписанных разделов.
sources.metrics.maxOffsetsBehindLatest Максимальное количество смещения, которое запрос потоковой передачи находится за последним доступным смещением среди всех подписанных разделов.
sources.metrics.minOffsetsBehindLatest Минимальное количество смещения, которое запрос потоковой передачи находится за последним доступным смещением среди всех подписанных разделов.

объект приемника (Kafka)

Метрическая Description
sink.description Описание приемника Kafka, в который записывается потоковый запрос, подробное описание используемой реализации приемника Kafka. Например: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows Количество строк, записанных в выходную таблицу или приемник в рамках микробатча. В некоторых ситуациях это значение может быть "-1" и, как правило, можно интерпретировать как "неизвестно".

объект sources (Delta Lake)

Метрическая Description
sources.description Описание источника, из которого выполняется потоковой запрос. Например: “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion Версия сериализации, с которой закодировано это смещение.
sources.[startOffset/endOffset].reservoirId Идентификатор считываемой таблицы. Это используется для обнаружения неправильной настройки при перезапуске запроса.
sources.[startOffset/endOffset].reservoirVersion Версия таблицы, которая в настоящее время обрабатывается.
sources.[startOffset/endOffset].index Индекс в последовательности AddFiles этой версии. Используется для разбиения больших фиксаций в несколько пакетов. Этот индекс создается путем сортировки modificationTimestamp и path.
sources.[startOffset/endOffset].isStartingVersion Определяет, отмечает ли текущее смещение начало нового потокового запроса, а не обработку изменений, произошедших после обработки исходных данных. При запуске нового запроса все данные, присутствующие в таблице в начале, обрабатываются сначала, а затем все новые данные, поступающие.
sources.latestOffset Последнее смещение, обработанное запросом microbatch.
sources.numInputRows Количество входных строк, обработанных из этого источника.
sources.inputRowsPerSecond Скорость, с которой данные приходят для обработки из этого источника.
sources.processedRowsPerSecond Скорость обработки данных spark из этого источника.
sources.metrics.numBytesOutstanding Объединенный размер невыполненных файлов (файлы, отслеживаемые RocksDB). Это метрика невыполненной работы для Delta и автозагрузчика в качестве источника потоковой передачи.
sources.metrics.numFilesOutstanding Количество необработанных файлов. Это метрика невыполненной работы для Delta и автозагрузчика в качестве источника потоковой передачи.

объект приемника (Delta Lake)

Метрическая Description
sink.description Описание приемника Delta, подробное описание используемой реализации приемника Delta. Например: “DeltaSink[table]”.
sink.numOutputRows Количество строк всегда равно "-1", так как Spark не может выводить выходные строки для приемников DSv1, что является классификацией приемника Delta Lake.

Примеры

Пример события Kafka to-Kafka StreamingQueryListener

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Пример события Delta Lake to-Delta Lake StreamingQueryListener

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Пример события Kinesis to-Delta Lake StreamingQueryListener

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Пример события Kafka+Delta Lake to-Delta Lake StreamingQueryListener

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Пример источника скорости в событие Delta Lake StreamingQueryListener

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}