Поделиться через


Разностный формат в Фабрике данных Azure

ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

В этой статье объясняется, как копировать данные в озеро разностных данных и обратно при использовании в Azure Data Lake Store 2-го поколения или Хранилища BLOB-объектов Azure с помощью разностного формата. Этот соединитель доступен как встроенный набор данных в потоке сопоставления данных в качестве источника и приемника.

Свойства потока данных для сопоставления

Этот соединитель доступен как встроенный набор данных в потоке сопоставления данных в качестве источника и приемника.

Свойства источника

В таблице, приведенной ниже, указаны свойства, поддерживаемые источником разностных данных. Изменить эти свойства можно на вкладке Source options (Параметры источника).

Имя Описание Обязательное поле Допустимые значения Свойство скрипта для потока данных
Формат Формат должен быть delta yes delta format
Файловая система Контейнер/файловая система озера разностных данных yes Строка fileSystem
Folder path Каталог разностного озера yes Строка folderPath
Тип сжатия Тип сжатия таблицы разностных данных no bzip2
gzip
deflate
ZipDeflate
snappy
lz4
compressionType
Compression level Выберите приоритет: максимально быстрое сжатие или оптимальное сжатие. Обязателен, если указан ключ compressedType. Optimal или Fastest compressionLevel
Переход по времени Выберите, следует ли запрашивать старый моментальный снимок таблицы разностных данных no Запрос по метке времени: метка времени
Запрос по версии: целое число
метка времениAsOf
versionAsOf
Allow no files found (Разрешить ненайденные файлы) Если значение true, ошибка не возникает, если файлы не найдены no true или false ignoreNoFilesFound

Импорт схемы

Разностные данные доступны только в качестве встроенного набора данных и по умолчанию не имеют связанной схемы. Чтобы получить метаданные столбца, нажмите кнопку "Импорт схемы" на вкладке "Проекция". Это позволяет ссылаться на имена столбцов и типы данных, указанные в корпусе. Чтобы импортировать схему, сеанс отладки потока данных должен быть активным, и для указания необходимо указать существующий файл определения сущности CDM.

Пример сценария источника разностных данных

source(output(movieId as integer,
            title as string,
            releaseDate as date,
            rated as boolean,
            screenedOn as timestamp,
            ticketPrice as decimal(10,2)
            ),
    store: 'local',
    format: 'delta',
    versionAsOf: 0,
    allowSchemaDrift: false,
    folderPath: $tempPath + '/delta'
  ) ~> movies

Свойства приемника

В таблице, приведенной ниже, указаны свойства, поддерживаемые приемником разностных данных. Эти свойства можно изменить на вкладке Параметры.

Имя Описание Обязательное поле Допустимые значения Свойство скрипта для потока данных
Формат Формат должен быть delta yes delta format
Файловая система Контейнер/файловая система озера разностных данных yes Строка fileSystem
Folder path Каталог разностного озера yes Строка folderPath
Тип сжатия Тип сжатия таблицы разностных данных no bzip2
gzip
deflate
ZipDeflate
snappy
lz4
TarGZip
tar
compressionType
Compression level Выберите приоритет: максимально быстрое сжатие или оптимальное сжатие. Обязателен, если указан ключ compressedType. Optimal или Fastest compressionLevel
Vacuum Удаляет файлы старше указанной длительности, которая больше не относится к текущей версии таблицы. Если указано значение 0 или меньше, операция вакуума не выполняется. yes Целое vacuum
Действие таблицы Сообщает ADF, что делать с целевой таблицей Delta в приемнике. Вы можете оставить все как есть и добавить новые строки, перезаписать существующее определение таблицы и данные новыми метаданными и данными или сохранить существующую структуру таблицы, но сначала усечь все строки, а затем вставить новые строки. no Ничего, Усечь, Перезаписать deltaTruncate, перезапись
Метод обновления При нажатии кнопки "Разрешить вставку" отдельно или при записи в новую разностную таблицу целевой объект получает все входящие строки независимо от набора политик строк. Если данные содержат строки других политик строк, их необходимо исключить с помощью предыдущего преобразования фильтра.

При выборе всех методов обновления выполняется слияние, где строки вставляются, удаляются или обновляются в соответствии с набором политик строк, используя предыдущее преобразование Alter Row.
yes true или false Вставляемый
deletable
upsertable
updateable
Оптимизированная запись Повышение пропускной способности для операции записи с помощью оптимизации внутреннего случайного перемешивания в исполнителях Spark. В результате разделов и файлов может быть меньше, но они могут иметь больший размер. no true или false optimizedWrite: true
Автоматическое сжатие После завершения операции записи Spark автоматически выполнит команду OPTIMIZE для реорганизации данных, в результате чего при необходимости появится больше разделов для повышения производительности чтения в будущем. no true или false autoCompact: true

Пример скрипта приемника разностных данных

Связанный скрипта потока данных:

moviesAltered sink(
          input(movieId as integer,
                title as string
            ),
           mapColumn(
                movieId,
                title
            ),
           insertable: true,
           updateable: true,
           deletable: true,
           upsertable: false,
           keys: ['movieId'],
            store: 'local',
           format: 'delta',
           vacuum: 180,
           folderPath: $tempPath + '/delta'
           ) ~> movieDB

Приемник изменений с удалением секций

С помощью этого варианта в пункте "Метод Update" выше (т. е. update/upsert/delete) можно ограничить количество проверяемых секций. Из целевого хранилища извлекается только секции, удовлетворяющие этому условию. Вы можете указать фиксированный набор значений, которые может принимать столбец секции.

Снимок экрана: параметры удаления секции для ограничения проверки.

Пример скрипта приемника изменений с удалением секций

Пример скрипта приведен ниже.

DerivedColumn1 sink( 
      input(movieId as integer,
            title as string
           ), 
      allowSchemaDrift: true,
      validateSchema: false,
      format: 'delta',
      container: 'deltaContainer',
      folderPath: 'deltaPath',
      mergeSchema: false,
      autoCompact: false,
      optimizedWrite: false,
      vacuum: 0,
      deletable:false,
      insertable:true,
      updateable:true,
      upsertable:false,
      keys:['movieId'],
      pruneCondition:['part_col' -> ([5, 8])],
      skipDuplicateMapInputs: true,
      skipDuplicateMapOutputs: true) ~> sink2
 

Delta будет считывать только 2 секции, где part_col == 5 и 8, из целевого разностного хранилища (вместо всех секций). part_col — это столбец, по которому секционированы целевые разностные данные. Ему не нужно присутствовать в исходных данных.

Варианты оптимизации приемника изменений

На вкладке "Параметры" вы найдете три дополнительных параметра для оптимизации преобразования разностного приемника.

  • Если включен параметр схемы слияния, он разрешает эволюцию схемы , т. е. любые столбцы, входящие в текущий поток, но не в целевой таблице Delta, автоматически добавляются в ее схему. Этот параметр поддерживается во всех методах обновления.

  • Если автоматическое сжатие включено, после отдельной операции записи преобразование проверяет возможность дополнительного сжатия файлов и запускает задание быстрой оптимизации (с размером файлов 128 МБ вместо 1 ГБ), чтобы еще больше сжать файлы для секций с наибольшим числом файлов малого размера. Автоматическое сжатие помогает объединить большое количество файлов малого размера в несколько крупных файлов. Автоматическое сжатие выполняется только при наличии не менее 50 файлов. После выполнения операции сжатия она создает новую версию таблицы и записывает новый файл с данными нескольких предыдущих файлов в сжатом виде.

  • Если включена оптимизация записи, преобразование приемника динамически оптимизирует размеры секций с учетом фактических данных, пытаясь записывать файлы размером 128 МБ для каждой секции таблицы. Это приблизительный размер, который может различаться в зависимости от характеристик набора данных. Оптимизированные операции записи повышают общую эффективность операций записи и последующих операций чтения. Он упорядочивает секции таким образом, чтобы повысить производительность последующих операций чтения.

Совет

Процесс оптимизированной записи замедлит все задание извлечения, преобразования и загрузки, так как приемник выдаст команду Spark Delta Lake Optimize после обработки данных. Рекомендуется не злоупотреблять оптимизированной записью. Например, если у вас есть почасовой конвейер данных, выполняйте поток данных с оптимизированной записью ежедневно.

Известные ограничения

При записи в приемник разностных данных существует известное ограничение, в котором количество строк, записанных, не будет отображаться в выходных данных мониторинга.