Поделиться через


Фрагментирование канала

В примере ChunkingChannel показано, как можно использовать настраиваемый протокол или многоуровневый канал для выполнения фрагментирования и расшифровки произвольных больших сообщений.

При отправке больших сообщений с помощью Windows Communication Foundation (WCF) часто рекомендуется ограничить объем памяти, используемой для буферизации этих сообщений. Одним из возможных решений является потоковая передача тела сообщения (предполагая, что основной объем данных содержится в теле сообщения). Однако для некоторых протоколов требуется буферизация всего сообщения. Надежный обмен сообщениями и безопасность являются двумя такими примерами. Другим возможным решением является разделение большого сообщения на маленькие, называемые фрагментами, и отправка этих фрагментов поочередно с последующим восстановлением большого сообщения на получающей стороне. Приложение само по себе может выполнять фрагментацию и дефрагментацию или использовать для этого пользовательский канал.

Фрагментация всегда должна применяться только после полного формирования сообщения для отправки. Канал фрагментации всегда должен располагаться под каналом безопасности и каналом надежного сеанса.

Примечание.

Процедура настройки и инструкции по построению для данного образца приведены в конце этого раздела.

Предположения и ограничения метода сегментации канала

Структура сообщения

Предполагается, что фрагментируемые сообщения, передаваемые в канал фрагментации, имеют следующую структуру.

<soap:Envelope>
  <!-- headers -->
  <soap:Body>
    <operationElement>
      <paramElement>data to be chunked</paramElement>
    </operationElement>
  </soap:Body>
</soap:Envelope>

При использовании ServiceModel операции контракта с одним входным параметром соответствуют этой форме входных сообщений. Аналогичным образом, выходные сообщения операций контракта, в которых имеется один выходной параметр или одно возвращаемое значение, соответствуют этой форме сообщения. Ниже приводятся примеры таких операций.

[ServiceContract]
interface ITestService
{
    [OperationContract]
    Stream EchoStream(Stream stream);

    [OperationContract]
    Stream DownloadStream();

    [OperationContract(IsOneWay = true)]
    void UploadStream(Stream stream);
}

Сеансы

Требуется, чтобы сообщения доставлялись в канал фрагментации ровно один раз в порядке доставки сообщений (фрагментов). Это означает, что базовый стек канала должен быть сеансовым. Сеансы могут предоставляться с помощью транспорта (например, TCP) или протокольного канала с поддержкой сеансов (например, канала ReliableSession).

Асинхронная отправка и получение

Методы асинхронной отправки и получения не реализованы в данной версии примера канала сегментации.

Протокол фрагментации

Канал фрагментации определяет протокол, указывающий начало и конец последовательностей фрагментов, а также порядковый номер каждого фрагмента. В следующих трех примерах сообщений показаны исходное сообщение, фрагментированное сообщение и конечное сообщение с комментариями, описывающими основные аспекты.

Исходное сообщение

<s:Envelope xmlns:a="http://www.w3.org/2005/08/addressing"
            xmlns:s="http://www.w3.org/2003/05/soap-envelope">
  <s:Header>
<!--Original message action is replaced with a chunking-specific action. -->
    <a:Action s:mustUnderstand="1">http://samples.microsoft.com/chunkingAction</a:Action>
<!--
Original message is assigned a unique id that is transmitted
in a MessageId header. Note that this is different from the WS-Addressing MessageId header.
-->
    <MessageId s:mustUnderstand="1" xmlns="http://samples.microsoft.com/chunking">
53f183ee-04aa-44a0-b8d3-e45224563109
</MessageId>
<!--
ChunkingStart header signals the start of a chunked message.
-->
    <ChunkingStart s:mustUnderstand="1" i:nil="true" xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://samples.microsoft.com/chunking" />
<!--
Original message action is transmitted in OriginalAction.
This is required to re-create the original message on the other side.
-->
    <OriginalAction xmlns="http://samples.microsoft.com/chunking">
http://tempuri.org/ITestService/EchoStream
    </OriginalAction>
   <!--
    All original message headers are included here.
   -->
  </s:Header>
  <s:Body>
<!--
Chunking assumes this structure of Body content:
<element>
  <childelement>large data to be chunked<childelement>
</element>
The start message contains just <element> and <childelement> without
the data to be chunked.
-->
    <EchoStream xmlns="http://tempuri.org/">
      <stream />
    </EchoStream>
  </s:Body>
</s:Envelope>

Фрагментированное сообщение

<s:Envelope
  xmlns:a="http://www.w3.org/2005/08/addressing"
  xmlns:s="http://www.w3.org/2003/05/soap-envelope">
  <s:Header>
   <!--
    All chunking protocol messages have this action.
   -->
    <a:Action s:mustUnderstand="1">
      http://samples.microsoft.com/chunkingAction
    </a:Action>
<!--
Same as MessageId in the start message. The GUID indicates which original message this chunk belongs to.
-->
    <MessageId s:mustUnderstand="1"
               xmlns="http://samples.microsoft.com/chunking">
      53f183ee-04aa-44a0-b8d3-e45224563109
    </MessageId>
<!--
The sequence number of the chunk.
This number restarts at 1 with each new sequence of chunks.
-->
    <ChunkNumber s:mustUnderstand="1"
                 xmlns="http://samples.microsoft.com/chunking">
      1096
    </ChunkNumber>
  </s:Header>
  <s:Body>
<!--
The chunked data is wrapped in a chunk element.
The encoding of this data (and the entire message)
depends on the encoder used. The chunking channel does not mandate an encoding.
-->
    <chunk xmlns="http://samples.microsoft.com/chunking">
kfSr2QcBlkHTvQ==
    </chunk>
  </s:Body>
</s:Envelope>

Конечное сообщение

<s:Envelope xmlns:a="http://www.w3.org/2005/08/addressing"
            xmlns:s="http://www.w3.org/2003/05/soap-envelope">
  <s:Header>
    <a:Action s:mustUnderstand="1">
      http://samples.microsoft.com/chunkingAction
    </a:Action>
<!--
Same as MessageId in the start message. The GUID indicates which original message this chunk belongs to.
-->
    <MessageId s:mustUnderstand="1"
               xmlns="http://samples.microsoft.com/chunking">
      53f183ee-04aa-44a0-b8d3-e45224563109
    </MessageId>
<!--
ChunkingEnd header signals the end of a chunk sequence.
-->
    <ChunkingEnd s:mustUnderstand="1" i:nil="true"
                 xmlns:i="http://www.w3.org/2001/XMLSchema-instance"
                 xmlns="http://samples.microsoft.com/chunking" />
<!--
ChunkingEnd messages have a sequence number.
-->
    <ChunkNumber s:mustUnderstand="1"
                 xmlns="http://samples.microsoft.com/chunking">
      79
    </ChunkNumber>
  </s:Header>
  <s:Body>
<!--
The ChunkingEnd message has the same <element><childelement> structure
as the ChunkingStart message.
-->
    <EchoStream xmlns="http://tempuri.org/">
      <stream />
    </EchoStream>
  </s:Body>
</s:Envelope>

Канальная архитектура сегментации

Канал сегментации — это канал IDuplexSessionChannel, который в общих чертах следует типичной архитектуре каналов. ChunkingBindingElement — это устройство, которое может построить ChunkingChannelFactory и ChunkingChannelListener. ChunkingChannelFactory создает экземпляры ChunkingChannel при соответствующем запросе. ChunkingChannelListener создает экземпляры ChunkingChannel, когда принимается новый внутренний канал. ChunkingChannel отвечает за отправку и получение сообщений.

На следующем более низком уровне ChunkingChannel основывается на нескольких компонентах для реализации протокола фрагментации. На отправляющей стороне канал использует пользовательский XmlDictionaryWriter, называемый ChunkingWriter, который непосредственно выполняет фрагментацию. ChunkingWriter непосредственно использует внутренний канал для отправки фрагментов данных. Использование настраиваемого XmlDictionaryWriter позволяет отправлять фрагменты по мере записи большого тела исходного сообщения. Это означает, что все исходное сообщение не буферизуется.

Диаграмма, показывающая архитектуру отправки через каналы сегментирования.

На получающей стороне ChunkingChannel извлекает сообщения из внутреннего канала и передает их в пользовательский XmlDictionaryReader, называемый ChunkingReader, который восстанавливает исходное сообщение из входящих фрагментов данных. ChunkingChannel выступает в роли оболочки ChunkingReader в пользовательской реализации Message, называемой ChunkingMessage, и возвращает это сообщение на уровень выше. Данное сочетание ChunkingReader и ChunkingMessage позволяет дефрагментировать текст исходного сообщения в процессе его считывания вышележащим уровнем вместо необходимости помещения в буфер всего текста исходного сообщения. ChunkingReader имеет очередь, в которой буферизуются входящие фрагменты данных, пока их количество не достигнет заданного. При достижении этого максимального предела устройство чтения ожидает, пока сообщения не будут извлечены из очереди верхним уровнем (то есть просто считыванием из тела исходного сообщения) или до достижения максимального времени ожидания получения.

Схема, показывающая архитектуру приема канала сегментации.

Модель программирования фрагментации

Разработчики службы могут указать, какие сообщения фрагментируются с применением атрибута ChunkingBehavior к операциям в контракте. Атрибут предоставляет свойство AppliesTo, позволяющее разработчику указать, применяется ли фрагментация к входящему или исходящему сообщению или к обоим сообщениям. В следующем примере показано применение атрибута ChunkingBehavior.

[ServiceContract]
interface ITestService
{
    [OperationContract]
    [ChunkingBehavior(ChunkingAppliesTo.Both)]
    Stream EchoStream(Stream stream);

    [OperationContract]
    [ChunkingBehavior(ChunkingAppliesTo.OutMessage)]
    Stream DownloadStream();

    [OperationContract(IsOneWay=true)]
    [ChunkingBehavior(ChunkingAppliesTo.InMessage)]
    void UploadStream(Stream stream);

}

В этой модели программирования ChunkingBindingElement компилирует список URI действия, которые идентифицируют сообщения, разделяемые на части. Действие каждого исходящего сообщения сопоставляется с этим списком для определения, фрагментируется ли сообщение или отправляется непосредственно.

Реализация операции отправки

На высоком уровне операция отправки вначале проверяет, следует ли фрагментировать исходящее сообщение, и, если фрагментация не требуется, отправляет сообщение непосредственно по внутреннему каналу.

Если сообщение должно быть фрагментировано, операция Send создает новый ChunkingWriter, вызывает WriteBodyContents для исходящего сообщения и передает ему этот ChunkingWriter. Затем ChunkingWriter выполняет фрагментацию сообщения (с копированием заголовков исходного сообщения в начальный фрагмент сообщения) и отправляет фрагменты по внутреннему каналу.

Следует отметить следующие моменты.

  • Отправьте первые вызовы ThrowIfDisposedOrNotOpened, чтобы гарантировать, что CommunicationState открыто.

  • Операция отправки синхронизируется таким образом, что во время каждого сеанса может быть отправлено только одно сообщение. Существует событие ManualResetEvent с именем sendingDone, которое обнуляется при отправке chunked-сообщения. Это событие задается при отправке последнего фрагмента сообщения. Перед попыткой отправки исходящего сообщения метод Send ожидает, пока это событие не будет установлено.

  • Метод Send блокирует CommunicationObject.ThisLock, чтобы предотвратить изменения состояния синхронизации при отправке. См. документацию по CommunicationObject для получения дополнительной информации о состояниях и конечных автоматах CommunicationObject.

  • Время ожидания, передаваемое в метод отправки, используется как время ожидания выполнения всей операции отправки, то есть отправки всех фрагментов.

  • Пользовательский дизайн XmlDictionaryWriter был выбран, чтобы избежать буферизации всего тела исходного сообщения. Если бы мы применили XmlDictionaryReader к телу, используя message.GetReaderAtBodyContents, всё тело было бы буферизировано. Вместо этого мы используем пользовательский XmlDictionaryWriter, который передается в message.WriteBodyContents. Так как сообщение вызывает WriteBase64 для средства записи, средство записи упаковывает фрагменты в сообщения и отправляет их по внутреннему каналу. WriteBase64 выполняет блокировку пока фрагмент не будет отправлен.

Реализация операции получения

На высоком уровне операция получения сперва проверяет, что входящее сообщение не является null и его действием является ChunkingAction. Если сообщение не соответствует обоим указанным критериям, оно возвращается из функции Receive без изменений. В противном случае, Receive создает новый ChunkingReader и новый ChunkingMessage, оборачивающий его (вызывая GetNewChunkingMessage). Перед возвратом нового ChunkingMessage, Receive использует поток из пула потоков для выполнения ReceiveChunkLoop, который в цикле вызывает innerChannel.Receive и передает фрагменты в ChunkingReader, пока не будет получено сообщение с последним фрагментом или не истечет время ожидания получения.

Следует отметить следующие моменты.

  • Как и операция отправки, операция получения сначала вызывает ThrowIfDisposedOrNotOpened, чтобы убедиться, что CommunicationState открыто.

  • Операция получения также синхронизируется таким образом, что во время каждого сеанса может быть получено только одно сообщение. Это особенно важно, так как после того, как получен начальный фрагмент сообщения, ожидается, что все последующие сообщения являются фрагментами этой новой последовательности фрагментов до получения конечного фрагмента сообщения. Операция получения не может извлекать сообщения из внутреннего канала до тех пор, пока все фрагменты сообщения, дефрагментируемого в настоящий момент, не будут получены. Чтобы выполнить это действие, операция получения использует событие ManualResetEvent с именем currentMessageCompleted, которое задается при получении последнего фрагмента сообщения и сбрасывается при получении нового начального фрагмента сообщения.

  • В отличие от операции отправки, операция получения не препятствует синхронизированным переходам между состояниями при получении. Например, операция "Закрыть" может быть вызвана во время получения; она будет ждать завершения незавершённого получения исходного сообщения или до истечения заданного времени ожидания.

  • Время ожидания, передаваемое в метод получения, используется как время ожидания выполнения всей операции получения, то есть получения всех фрагментов.

  • Если уровень, считывающий сообщение, считывает тело сообщения с частотой более низкой, чем частота входящих фрагментов сообщения, ChunkingReader выполняет буферизацию входящих фрагментов до предела, установленного ChunkingBindingElement.MaxBufferedChunks. При достижении заданного предела фрагменты не извлекаются из более низкого уровня до тех пор, пока не будет потреблён буферизованный фрагмент или не истечет время ожидания операции получения.

CommunicationObject - переопределения

Открытие

OnOpen вызывает innerChannel.Open, чтобы открыть внутренний канал.

Закрытие

OnClose сначала задает stopReceive значение true, сигнализируя о необходимости остановить ReceiveChunkLoop. Затем оно ожидает receiveStoppedManualResetEvent, который устанавливается, когда ReceiveChunkLoop останавливается. Предполагая, что ReceiveChunkLoop остановится в течение заданного времени ожидания, OnClose вызывает innerChannel.Close с остающимся временем ожидания.

OnAbort

OnAbort вызывает innerChannel.Abort, чтобы прервать внутренний канал. Если есть ожидающий ReceiveChunkLoop, он получает исключение от ожидающего вызова innerChannel.Receive.

OnFaulted

Для ChunkingChannel не требуется специальное поведение при возникновении ошибки в канале, поэтому OnFaulted не переопределяется.

Реализация фабрики каналов

ChunkingChannelFactory отвечает за создание экземпляров ChunkingDuplexSessionChannel и каскадную передачу изменений состояния на внутреннюю фабрику каналов.

OnCreateChannel использует фабрику внутренних каналов для создания внутреннего канала IDuplexSessionChannel. Затем метод создает новый ChunkingDuplexSessionChannel, передавая ему этот внутренний канал вместе со списком действий фрагментируемых сообщений и максимальным количеством фрагментов, буферизуемых при получении. Список фрагментируемых сообщений и максимальное количество буферизуемых фрагментов - это два параметра, передаваемые ChunkingChannelFactory в его конструкторе. В разделе об ChunkingBindingElement описывается, откуда возникают эти значения.

Методы OnOpen, OnClose, OnAbort и их асинхронные эквиваленты вызывают соответствующий метод перехода между состояниями в фабрике внутренних каналов.

Реализация прослушивателя каналов

ChunkingChannelListener является оболочкой для внутреннего канала-приемника. Его основная функция, помимо делегирования вызовов в прослушиватель внутренних каналов, заключается в оборачивании новых ChunkingDuplexSessionChannels вокруг каналов, принимаемых из прослушивателя внутренних каналов. Это выполняется в OnAcceptChannel и OnEndAcceptChannel. Во внутренний канал передается вновь созданная ChunkingDuplexSessionChannel вместе с другими ранее описанными параметрами.

Реализация элемента привязки и связи

Элемент ChunkingBindingElement отвечает за создание ChunkingChannelFactory и ChunkingChannelListener. ПроверяетChunkingBindingElement, является ли T в CanBuildChannelFactory<T> и CanBuildChannelListener<T> типом IDuplexSessionChannel (единственным каналом, поддерживаемым каналом разбиения) и поддерживают ли другие элементы привязки этот тип канала.

BuildChannelFactory <Сначала T> проверяет, можно ли создать запрошенный тип канала, а затем получает список действий сообщения, которые будут разбиты на части. Дополнительные сведения см. в следующем разделе. Затем метод создает новый ChunkingChannelFactory, передавая ему внутреннюю фабрику каналов (как возвращено из context.BuildInnerChannelFactory<IDuplexSessionChannel>), список действий сообщений и максимальное количество фрагментов для буферизации. Максимальное количество фрагментов определяется свойством MaxBufferedChunks, предоставляемым ChunkingBindingElement.

BuildChannelListener<T> имеет аналогичную реализацию для создания ChunkingChannelListener и передачи его внутреннему прослушивателю канала.

В этом образце включен пример привязки с именем TcpChunkingBinding. Эта привязка состоит из двух элементов привязки: TcpTransportBindingElement и ChunkingBindingElement. Помимо предоставления свойства MaxBufferedChunks, привязка также задает некоторые свойства TcpTransportBindingElement, например MaxReceivedMessageSize (задает ему значение ChunkingUtils.ChunkSize + 100 КБ байтов для заголовков).

TcpChunkingBinding также реализует IBindingRuntimePreferences и возвращает значение "true" из метода ReceiveSynchronously, что указывает на то, что реализуются только синхронные вызовы получения.

Какие сообщения следует фрагментировать

Канал фрагментации фрагментирует только сообщения, определенные посредством атрибута ChunkingBehavior. Класс ChunkingBehavior реализует метод IOperationBehavior и реализуется с помощью вызова метода AddBindingParameter. В этом методе ChunkingBehavior проверяет значение свойства AppliesTo (InMessage, OutMessage или обоих), чтобы определить, какие сообщения следует фрагментировать. Затем он получает действие для каждого из этих сообщений (из коллекции сообщений в OperationDescription) и добавляет его в коллекцию строк, которая содержится в экземпляре ChunkingBindingParameter. Затем он добавляет этот элемент ChunkingBindingParameter в предоставленный BindingParameterCollection.

BindingParameterCollection передается внутри BindingContext каждому элементу привязки в привязке при построении этим элементом привязки фабрики каналов или прослушивателя каналов. Реализация ChunkingBindingElement и BuildChannelFactory<T> извлекают этот BuildChannelListener<T> изChunkingBindingParameterBindingContext'. BindingParameterCollection Коллекция действий, содержащихся в параметре ChunkingBindingParameter, затем передаётся на ChunkingChannelFactory или ChunkingChannelListener, которые, в свою очередь, передают её в ChunkingDuplexSessionChannel.

Запуск примера

Установка, сборка и выполнение примера

  1. Установите ASP.NET 4.0 с помощью следующей команды.

    %windir%\Microsoft.NET\Framework\v4.0.XXXXX\aspnet_regiis.exe /i /enable
    
  2. Убедитесь, что вы выполнили процедуру однократной настройки для образцов Windows Communication Foundation.

  3. Чтобы создать решение, следуйте инструкциям в разделе Building the Windows Communication Foundation Samples.

  4. Чтобы запустить пример в конфигурации с одним или несколькими компьютерами, следуйте инструкциям в разделе "Примеры Windows Communication Foundation".

  5. Сначала запустите файл Service.exe, затем Client.exe и наблюдайте результат в обоих окнах консоли.

При выполнении этого примера ожидаются следующие данные на выходе.

Клиент —

Press enter when service is available

 > Sent chunk 1 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 > Sent chunk 2 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 > Sent chunk 3 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 > Sent chunk 4 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 > Sent chunk 5 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 > Sent chunk 6 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 > Sent chunk 7 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 > Sent chunk 8 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 > Sent chunk 9 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 > Sent chunk 10 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 < Received chunk 1 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 < Received chunk 2 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 < Received chunk 3 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 < Received chunk 4 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 < Received chunk 5 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 < Received chunk 6 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 < Received chunk 7 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 < Received chunk 8 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 < Received chunk 9 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 < Received chunk 10 of message 5b226ad5-c088-4988-b737-6a565e0563dd

Сервер:

Service started, press enter to exit
 < Received chunk 1 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 < Received chunk 2 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 < Received chunk 3 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 < Received chunk 4 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 < Received chunk 5 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 < Received chunk 6 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 < Received chunk 7 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 < Received chunk 8 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 < Received chunk 9 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 < Received chunk 10 of message 867c1fd1-d39e-4be1-bc7b-32066d7ced10
 > Sent chunk 1 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 > Sent chunk 2 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 > Sent chunk 3 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 > Sent chunk 4 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 > Sent chunk 5 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 > Sent chunk 6 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 > Sent chunk 7 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 > Sent chunk 8 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 > Sent chunk 9 of message 5b226ad5-c088-4988-b737-6a565e0563dd
 > Sent chunk 10 of message 5b226ad5-c088-4988-b737-6a565e0563dd