Поделиться через


Использование планировщиков

Планировщик управляет запуском подписки и публикацией уведомлений. Он состоит из трех компонентов. Сначала это структура данных. При планировании выполнения задач они помещаются в планировщик для постановки в очередь на основе приоритета или других критериев. Он также предлагает контекст выполнения, который указывает, где выполняется задача (например, в пуле потоков, текущем потоке или в другом домене приложения). Наконец, у него есть часы, которые предоставляют понятие времени для себя (путем доступа к свойству Now планировщика). Задачи, запланированные для определенного планировщика, будут соответствовать времени, обозначаемого только этим временем.

Планировщики также вводят понятие виртуального времени (обозначаемое типом VirtualScheduler), которое не соответствует реальному времени, используемому в нашей повседневной жизни. Например, последовательность, которая занимает 100 лет, может быть запланирована на выполнение в виртуальное время всего за 5 минут. Это будет описано в разделе Тестирование и отладка наблюдаемых последовательностей .

Типы планировщика

Все типы планировщиков, предоставляемые Rx, реализуют интерфейс IScheduler . Каждый из них можно создать и вернуть с помощью статических свойств типа Планировщик. ImmediateScheduler (путем доступа к статическому свойству Интерпретация) немедленно запустит указанное действие. CurrentThreadScheduler (путем доступа к статическому свойству CurrentThread) запланировало выполнение действий в потоке, который выполняет исходный вызов. Действие выполняется не сразу, а помещается в очередь и выполняется только после завершения текущего действия. DispatcherScheduler (путем доступа к статическому свойству Dispatcher) будет планировать действия с текущим диспетчером, что полезно для разработчиков Silverlight, использующих Rx. Затем указанные действия делегируются методу Dispatcher.BeginInvoke() в Silverlight. NewThreadScheduler (путем доступа к статическому свойству NewThread ) планирует действия в новом потоке и оптимально подходит для планирования длительных или блокирующих действий. TaskPoolScheduler (путем доступа к статическому свойству TaskPool ) планирует действия в определенной фабрике задач. ThreadPoolScheduler (путем доступа к статичному свойству ThreadPool ) планирует действия в пуле потоков. Оба планировщика пула оптимизированы для краткосрочных действий.

Использование планировщиков

Возможно, вы уже использовали планировщики в коде Rx без явного указания типа используемых планировщиков. Это связано с тем, что все наблюдаемые операторы, которые имеют дело с параллелизмом, имеют несколько перегрузок. Если вы не используете перегрузку, которая принимает планировщик в качестве аргумента, Rx выберет планировщик по умолчанию, используя принцип наименьшего параллелизма. Это означает, что выбирается планировщик, который вводит наименьший объем параллелизма, удовлетворяющий потребностям оператора.  Например, для операторов, возвращающих наблюдаемый объект с ограниченным и небольшим количеством сообщений, Rx вызывает Интерпретация.  Для операторов, возвращающих потенциально большое или бесконечное количество сообщений, вызывается Метод CurrentThread . Для операторов, использующих таймеры, используется ThreadPool .

Так как Rx использует планировщик с наименьшим параллелизмом, можно выбрать другой планировщик, если требуется реализовать параллелизм для повышения производительности или если возникла проблема с сходством потоков.  Например, если вы не хотите блокировать определенный поток, в данном случае следует использовать ThreadPool.  Примером последнего является то, что, когда требуется запустить таймер в пользовательском интерфейсе, в этом случае следует использовать Dispatcher. Чтобы указать конкретный планировщик, можно использовать перегрузки операторов, которые принимают планировщик, например Timer(TimeSpan.FromSeconds(10), Scheduler.DispatcherScheduler()).

В следующем примере исходная наблюдаемая последовательность создает значения в бешеном темпе. Перегрузка по умолчанию оператора Timer помещает сообщения OnNext в ThreadPool.

Observable.Timer(Timespan.FromSeconds(0.01))
          .Subscribe(…);

Это приведет к быстрой постановке наблюдателя в очередь. Мы можем улучшить этот код с помощью оператора ObserveOn, который позволяет указать контекст, который вы хотите использовать для отправки push-уведомлений (OnNext) наблюдателям. По умолчанию оператор ObserveOn гарантирует, что OnNext будет вызываться как можно больше раз в текущем потоке. Вы можете использовать его перегрузки и перенаправлять выходные данные OnNext в другой контекст. Кроме того, оператор SubscribeOn можно использовать для возврата наблюдаемого прокси-сервера, который делегирует действия определенному планировщику. Например, для приложения с интенсивным использованием пользовательского интерфейса можно делегировать все фоновые операции, выполняемые с планировщиком, работающим в фоновом режиме, с помощью команды SubscribeOn и передачи ему ThreadPoolScheduler. Чтобы получать отправляемые уведомления и получать доступ к любому элементу пользовательского интерфейса, можно передать экземпляр DispatcherScheduler оператору ObserveOn.

В следующем примере показано, как запланировать все уведомления OnNext в текущем диспетчере, чтобы в потоке пользовательского интерфейса отправлялось любое вытесненное значение. Это особенно полезно для разработчиков Silverlight, использующих Rx.

Observable.Timer(Timespan.FromSeconds(0.01))
          .ObserveOn(Scheduler.DispatcherScheduler)
          .Subscribe(…);

Вместо использования оператора ObservOn для изменения контекста выполнения, в котором наблюдаемая последовательность создает сообщения, мы можем создать параллелизм в нужном месте для начала. Так как операторы параметризируют введение параллелизма, предоставляя перегрузку аргумента планировщика, передача правильного планировщика приведет к уменьшению числа мест, где необходимо использовать оператор ObserveOn. Например, можно разблокировать наблюдатель и подписаться на поток пользовательского интерфейса напрямую, изменив планировщик, используемый источником, как показано в следующем примере. В этом коде с помощью перегрузки таймера, которая принимает планировщик и предоставляет Scheduler.Dispatcher экземпляр, все значения, вытесненные из этой наблюдаемой последовательности, будут поступать в поток пользовательского интерфейса.

Observable.Timer(Timespan.FromSeconds(0.01), Scheduler.DispatcherScheduler)
          .Subscribe(…);

Следует также отметить, что с помощью оператора ObservOn запланировано действие для каждого сообщения, которое поступает через исходную наблюдаемую последовательность. Это потенциально изменяет информацию о времени, а также ставит дополнительную нагрузку на систему. Если у вас есть запрос, который состоит из различных наблюдаемых последовательностей, выполняемых в различных контекстах выполнения, и вы выполняете фильтрацию в запросе, лучше поместить ObservOn позже в запрос. Это связано с тем, что запрос потенциально отфильтрует большое количество сообщений, а размещение оператора ObserveOn ранее в запросе будет выполнять дополнительную работу с сообщениями, которые в любом случае будут отфильтрованы. Вызов оператора ObserveOn в конце запроса приведет к наименьшим последствиям для производительности.

Еще одно преимущество явного указания типа планировщика заключается в том, что вы можете ввести параллелизм для повышения производительности, как показано в следующем коде.

seq.GroupBy(...)
        .Select(x=>x.ObserveOn(Scheduler.NewThread))
        .Select(x=>expensive(x))  // perform operations that are expensive on resources