Поделиться через


Поддержка потоковой передачи в SqlClient

Поддержка потоковой передачи между SQL Server и приложением (новая в платформа .NET Framework 4.5) поддерживает неструктурированные данные на сервере (документы, изображения и файлы мультимедиа). База данных SQL Server может хранить большие двоичные объекты (BLOB), но для их извлечения может потребоваться много памяти.

Поддержка потоковой передачи данных в SQL Server и обратно упрощает написание приложений, которые выполняют потоковую передачу данных, что позволяет обойтись без полной загрузки данных в память и, следовательно, снизить количество исключений, связанных с переполнением памяти.

Поддержка потоковой передачи данных способствует также лучшему масштабированию приложений среднего уровня, особенно в сценариях, в которых бизнес-объекты подключаются к SQL Azure для отправки, получения и обработки больших двоичных объектов.

Предупреждение

Асинхронные вызовы не поддерживаются, если в приложении также используется ключевое слово строки соединения Context Connection.

Элементы, добавленные для поддержки потоков, используются для получения данных из запросов и передачи параметров в запросы и хранимые процедуры. Функция потоковой передачи предназначена для основных сценариев OLTP и миграции данных и применима к локальным и удаленным средам переноса данных.

Поддержка потоковой передачи из SQL Server

Поддержка потоковой передачи из SQL Server привела к появлению новых функциональных возможностей в классах DbDataReader и SqlDataReader для получения объектов Stream, XmlReader и TextReader, а также реагирования на них. Эти классы используются для получения данных из запросов. В результате поддержка потоковой передачи из SQL Server обращается к сценариям OLTP и применяется к локальной и удаленной средам.

Для активации поддержки потоковой передачи из SQL Server в SqlDataReader добавлены следующие элементы:

Для активации поддержки потоковой передачи из SQL Server в DbDataReader добавлены следующие элементы:

Поддержка потоковой передачи в SQL Server

Поддержка потоковой передачи в SQL Server предоставляет новые функциональные возможности в SqlParameter классе, чтобы он принимал и реагировать на объекты и StreamTextReader реагировать на XmlReaderних. SqlParameter используется для передачи параметров в запросы и хранимые процедуры.

Удаление объекта SqlCommand или вызов Cancel должны приводить к отмене любой потоковой операции. Если приложение передает CancellationToken, отмена не гарантируется.

Следующие типы SqlDbType могут принимать объект Value из Stream:

  • Binary
  • VarBinary

Следующие типы SqlDbType могут принимать объект Value из TextReader:

  • Char
  • NChar
  • NVarChar
  • Xml

Тип Xml SqlDbType принимает Value из XmlReader

SqlValue может принимать значения типа XmlReader, TextReader и Stream.

Объекты XmlReader, TextReader и Stream будут перенесены вплоть до значения, определяемого в Size.

Пример. Потоковая передача из SQL Server

Для создания примера базы данных используйте следующий язык Transact-SQL:

CREATE DATABASE [Demo]
GO
USE [Demo]
GO
CREATE TABLE [Streams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[textdata] NVARCHAR(MAX),
[bindata] VARBINARY(MAX),
[xmldata] XML)
GO
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'This is a test', 0x48656C6C6F, N'<test>value</test>')
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'Hello, World!', 0x54657374696E67, N'<test>value2</test>')
INSERT INTO [Streams] (textdata, bindata, xmldata) VALUES (N'Another row', 0x666F6F626172, N'<fff>bbb</fff><fff>bbc</fff>')
GO

В образце демонстрируется выполнение следующих действий.

  • Избегайте блокирования потока пользовательского интерфейса путем предоставления асинхронного способа извлечения больших файлов.
  • Передача большого текстового файла из SQL Server в платформа .NET Framework 4.5.
  • Передача большого XML-файла из SQL Server в платформа .NET Framework 4.5.
  • Получение данных из SQL Server.
  • Перенос файлов больших двоичных объектов из одной базы данных SQL Server в другую, не сталкиваясь с проблемой нехватки памяти.
using System;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using System.Threading.Tasks;
using System.Xml;

namespace StreamingFromServer {
   class Program {
      private const string connectionString = @"...";

      static void Main(string[] args) {
         CopyBinaryValueToFile().Wait();
         PrintTextValues().Wait();
         PrintXmlValues().Wait();
         PrintXmlValuesViaNVarChar().Wait();

         Console.WriteLine("Done");
      }

      // Retrieve a large BLOB from SQL Server in .NET Framework 4.5 using the asynchronous capability.
      private static async Task CopyBinaryValueToFile() {
         string filePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.MyDocuments), "binarydata.bin");

         using (SqlConnection connection = new SqlConnection(connectionString)) {
            await connection.OpenAsync();
            using (SqlCommand command = new SqlCommand("SELECT [bindata] FROM [Streams] WHERE [id]=@id", connection)) {
               command.Parameters.AddWithValue("id", 1);

               // The reader needs to be executed with the SequentialAccess behavior to enable network streaming.
               // Otherwise ReadAsync will buffer the entire BLOB into memory which can cause scalability issues or even OutOfMemoryExceptions.
               using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) {
                  if (await reader.ReadAsync()) {
                     if (!(await reader.IsDBNullAsync(0))) {
                        using (FileStream file = new FileStream(filePath, FileMode.Create, FileAccess.Write)) {
                           using (Stream data = reader.GetStream(0)) {

                              // Asynchronously copy the stream from the server to the file we just created.
                              await data.CopyToAsync(file);
                           }
                        }
                     }
                  }
               }
            }
         }
      }

      // Transfer a large Text File from SQL Server in .NET Framework 4.5.
      private static async Task PrintTextValues() {
         using (SqlConnection connection = new SqlConnection(connectionString)) {
            await connection.OpenAsync();
            using (SqlCommand command = new SqlCommand("SELECT [id], [textdata] FROM [Streams]", connection)) {

               // The reader needs to be executed with the SequentialAccess behavior to enable network streaming.
               // Otherwise ReadAsync will buffer the entire text document into memory which can cause scalability issues or even OutOfMemoryExceptions.
               using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) {
                  while (await reader.ReadAsync()) {
                     Console.Write("{0}: ", reader.GetInt32(0));

                     if (await reader.IsDBNullAsync(1)) {
                        Console.Write("(NULL)");
                     }
                     else {
                        char[] buffer = new char[4096];
                        int charsRead = 0;
                        using (TextReader data = reader.GetTextReader(1)) {
                           do {
                              // Grab each chunk of text and write it to the console.
                              // If you are writing to a TextWriter, you should use WriteAsync or WriteLineAsync.
                              charsRead = await data.ReadAsync(buffer, 0, buffer.Length);
                              Console.Write(buffer, 0, charsRead);
                           } while (charsRead > 0);
                        }
                     }

                     Console.WriteLine();
                  }
               }
            }
         }
      }

      // Transfer a large Xml Document from SQL Server in .NET Framework 4.5.
      private static async Task PrintXmlValues() {
         using (SqlConnection connection = new SqlConnection(connectionString)) {
            await connection.OpenAsync();
            using (SqlCommand command = new SqlCommand("SELECT [id], [xmldata] FROM [Streams]", connection)) {

               // The reader needs to be executed with the SequentialAccess behavior to enable network streaming.
               // Otherwise ReadAsync will buffer the entire Xml Document into memory which can cause scalability issues or even OutOfMemoryExceptions.
               using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) {
                  while (await reader.ReadAsync()) {
                     Console.WriteLine("{0}: ", reader.GetInt32(0));

                     if (await reader.IsDBNullAsync(1)) {
                        Console.WriteLine("\t(NULL)");
                     }
                     else {
                        using (XmlReader xmlReader = reader.GetXmlReader(1)) {
                           int depth = 1;
                           // NOTE: The XmlReader returned by GetXmlReader does NOT support async operations.
                           // See the example below (PrintXmlValuesViaNVarChar) for how to get an XmlReader with asynchronous capabilities.
                           while (xmlReader.Read()) {
                              switch (xmlReader.NodeType) {
                                 case XmlNodeType.Element:
                                    Console.WriteLine("{0}<{1}>", new string('\t', depth), xmlReader.Name);
                                    depth++;
                                    break;
                                 case XmlNodeType.Text:
                                    Console.WriteLine("{0}{1}", new string('\t', depth), xmlReader.Value);
                                    break;
                                 case XmlNodeType.EndElement:
                                    depth--;
                                    Console.WriteLine("{0}</{1}>", new string('\t', depth), xmlReader.Name);
                                    break;
                              }
                           }
                        }
                     }
                  }
               }
            }
         }
      }

      // Transfer a large Xml Document from SQL Server in .NET Framework 4.5.
      // This goes via NVarChar and TextReader to enable asynchronous reading.
      private static async Task PrintXmlValuesViaNVarChar() {
         XmlReaderSettings xmlSettings = new XmlReaderSettings() {
            // Async must be explicitly enabled in the XmlReaderSettings otherwise the XmlReader will throw exceptions when async methods are called.
            Async = true,
            // Since we will immediately wrap the TextReader we are creating in an XmlReader, we will permit the XmlReader to take care of closing\disposing it.
            CloseInput = true,
            // If the Xml you are reading is not a valid document (as per <https://learn.microsoft.com/previous-versions/dotnet/netframework-4.0/6bts1x50(v=vs.100)>) you will need to set the conformance level to Fragment.
            ConformanceLevel = ConformanceLevel.Fragment
         };

         using (SqlConnection connection = new SqlConnection(connectionString)) {
            await connection.OpenAsync();

            // Cast the XML into NVarChar to enable GetTextReader - trying to use GetTextReader on an XML type will throw an exception.
            using (SqlCommand command = new SqlCommand("SELECT [id], CAST([xmldata] AS NVARCHAR(MAX)) FROM [Streams]", connection)) {

               // The reader needs to be executed with the SequentialAccess behavior to enable network streaming.
               // Otherwise ReadAsync will buffer the entire Xml Document into memory which can cause scalability issues or even OutOfMemoryExceptions.
               using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) {
                  while (await reader.ReadAsync()) {
                     Console.WriteLine("{0}:", reader.GetInt32(0));

                     if (await reader.IsDBNullAsync(1)) {
                        Console.WriteLine("\t(NULL)");
                     }
                     else {
                        // Grab the row as a TextReader, then create an XmlReader on top of it.
                        // The code doesn't keep a reference to the TextReader since the XmlReader is created with the "CloseInput" setting (so it will close the TextReader when needed).
                        using (XmlReader xmlReader = XmlReader.Create(reader.GetTextReader(1), xmlSettings)) {
                           int depth = 1;
                           // The XmlReader above now supports asynchronous operations, so we can use ReadAsync here.
                           while (await xmlReader.ReadAsync()) {
                              switch (xmlReader.NodeType) {
                                 case XmlNodeType.Element:
                                    Console.WriteLine("{0}<{1}>", new string('\t', depth), xmlReader.Name);
                                    depth++;
                                    break;
                                 case XmlNodeType.Text:
                                    // Depending on what your data looks like, you should either use Value or GetValueAsync.
                                    // Value has less overhead (since it doesn't create a Task), but it may also block if additional data is required.
                                    Console.WriteLine("{0}{1}", new string('\t', depth), await xmlReader.GetValueAsync());
                                    break;
                                 case XmlNodeType.EndElement:
                                    depth--;
                                    Console.WriteLine("{0}</{1}>", new string('\t', depth), xmlReader.Name);
                                    break;
                              }
                           }
                        }
                     }
                  }
               }
            }
         }
      }
   }
}

Пример. Потоковая передача в SQL Server

Для создания примера базы данных используйте следующий язык Transact-SQL:

CREATE DATABASE [Demo2]
GO
USE [Demo2]
GO
CREATE TABLE [BinaryStreams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[bindata] VARBINARY(MAX))
GO
CREATE TABLE [TextStreams] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[textdata] NVARCHAR(MAX))
GO
CREATE TABLE [BinaryStreamsCopy] (
[id] INT PRIMARY KEY IDENTITY(1, 1),
[bindata] VARBINARY(MAX))
GO

В образце демонстрируется выполнение следующих действий.

  • Передача большого БОЛЬШОго BLOB-объекта в SQL Server в платформа .NET Framework 4.5.
  • Передача большого текстового файла в SQL Server в платформа .NET Framework 4.5.
  • Используйте новую асинхронную функцию для передачи большого BLOB-объекта.
  • Используйте новую асинхронную функцию и ключевое слово await для передачи большого BLOB-объекта.
  • Отмена передачи большого BLOB-объекта.
  • Потоковая передача из одного SQL Server в другой с помощью новой асинхронной функции.
using System;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace StreamingToServer {
   class Program {
      private const string connectionString = @"...";

      static void Main(string[] args) {
         CreateDemoFiles();

         StreamBLOBToServer().Wait();
         StreamTextToServer().Wait();

         // Create a CancellationTokenSource that will be cancelled after 100ms
         // Typically this token source will be cancelled by a user request (e.g. a Cancel button).
         CancellationTokenSource tokenSource = new CancellationTokenSource();
         tokenSource.CancelAfter(100);
         try {
            CancelBLOBStream(tokenSource.Token).Wait();
         }
         catch (AggregateException ex) {
            // Cancelling an async operation will throw an exception.
            // Since we are using the Task's Wait method, this exception will be wrapped in an AggregateException.
            // If you were using the 'await' keyword, the compiler would take care of unwrapping the AggregateException.
            // Depending on when the cancellation occurs, you can either get an error from SQL Server or from .Net.
            if ((ex.InnerException is SqlException) || (ex.InnerException is TaskCanceledException)) {
               // This is an expected exception.
               Console.WriteLine("Got expected exception: {0}", ex.InnerException.Message);
            }
            else {
               // Did not expect this exception - rethrow it.
               throw;
            }
         }

         Console.WriteLine("Done");
      }

      // This is used to generate the files which are used by the other sample methods.
      private static void CreateDemoFiles() {
         Random rand = new Random();
         byte[] data = new byte[1024];
         rand.NextBytes(data);

         using (FileStream file = File.Open("binarydata.bin", FileMode.Create)) {
            file.Write(data, 0, data.Length);
         }

         using (StreamWriter writer = new StreamWriter(File.Open("textdata.txt", FileMode.Create))) {
            writer.Write(Convert.ToBase64String(data));
         }
      }

      // Transfer a large BLOB to SQL Server in .NET Framework 4.5.
      private static async Task StreamBLOBToServer() {
         using (SqlConnection conn = new SqlConnection(connectionString)) {
            await conn.OpenAsync();
            using (SqlCommand cmd = new SqlCommand("INSERT INTO [BinaryStreams] (bindata) VALUES (@bindata)", conn)) {
               using (FileStream file = File.Open("binarydata.bin", FileMode.Open)) {

                  // Add a parameter which uses the FileStream we just opened
                  // Size is set to -1 to indicate "MAX".
                  cmd.Parameters.Add("@bindata", SqlDbType.Binary, -1).Value = file;

                  // Send the data to the server asynchronously.
                  await cmd.ExecuteNonQueryAsync();
               }
            }
         }
      }

      // Transfer a large Text File to SQL Server in .NET Framework 4.5.
      private static async Task StreamTextToServer() {
         using (SqlConnection conn = new SqlConnection(connectionString)) {
            await conn.OpenAsync();
            using (SqlCommand cmd = new SqlCommand("INSERT INTO [TextStreams] (textdata) VALUES (@textdata)", conn)) {
               using (StreamReader file = File.OpenText("textdata.txt")) {

                  // Add a parameter which uses the StreamReader we just opened.
                  // Size is set to -1 to indicate "MAX".
                  cmd.Parameters.Add("@textdata", SqlDbType.NVarChar, -1).Value = file;

                  // Send the data to the server asynchronously.
                  await cmd.ExecuteNonQueryAsync();
               }
            }
         }
      }

      // Cancel the transfer of a large BLOB.
      private static async Task CancelBLOBStream(CancellationToken cancellationToken) {
         using (SqlConnection conn = new SqlConnection(connectionString)) {
            // We can cancel not only sending the data to the server, but also opening the connection.
            await conn.OpenAsync(cancellationToken);

            // Artificially delay the command by 100ms.
            using (SqlCommand cmd = new SqlCommand("WAITFOR DELAY '00:00:00:100';INSERT INTO [BinaryStreams] (bindata) VALUES (@bindata)", conn)) {
               using (FileStream file = File.Open("binarydata.bin", FileMode.Open)) {

                  // Add a parameter which uses the FileStream we just opened.
                  // Size is set to -1 to indicate "MAX".
                  cmd.Parameters.Add("@bindata", SqlDbType.Binary, -1).Value = file;

                  // Send the data to the server asynchronously.
                  // Pass the cancellation token such that the command will be cancelled if needed.
                  await cmd.ExecuteNonQueryAsync(cancellationToken);
               }
            }
         }
      }
   }
}

Пример. Потоковая передача из одного SQL Server в другой SQL Server

В примере показано, как асинхронно передавать большой двоичный объект из одного объекта SQL Server в другой с поддержкой отмены.

using System;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace StreamingFromServerToAnother {
   class Program {
      private const string connectionString = @"...";

      static void Main(string[] args) {
         // For this example, we don't want to cancel,
         // so pass in a "blank" cancellation token.
         E2EStream(CancellationToken.None).Wait();

         Console.WriteLine("Done");
      }

      // Streaming from one SQL Server to Another One using the new Async.NET.
      private static async Task E2EStream(CancellationToken cancellationToken) {
         using (SqlConnection readConn = new SqlConnection(connectionString)) {
            using (SqlConnection writeConn = new SqlConnection(connectionString)) {

               // Note that we are using the same cancellation token for calls to both connections\commands.
               // Also we can start both the connection opening asynchronously, and then wait for both to complete.
               Task openReadConn = readConn.OpenAsync(cancellationToken);
               Task openWriteConn = writeConn.OpenAsync(cancellationToken);
               await Task.WhenAll(openReadConn, openWriteConn);

               using (SqlCommand readCmd = new SqlCommand("SELECT [bindata] FROM [BinaryStreams]", readConn)) {
                  using (SqlCommand writeCmd = new SqlCommand("INSERT INTO [BinaryStreamsCopy] (bindata) VALUES (@bindata)", writeConn)) {

                     // Add an empty parameter to the write command which will be used for the streams we are copying.
                     // Size is set to -1 to indicate "MAX".
                     SqlParameter streamParameter = writeCmd.Parameters.Add("@bindata", SqlDbType.Binary, -1);

                     // The reader needs to be executed with the SequentialAccess behavior to enable network streaming.
                     // Otherwise ReadAsync will buffer the entire BLOB into memory which can cause scalability issues or even OutOfMemoryExceptions.
                     using (SqlDataReader reader = await readCmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess, cancellationToken)) {
                        while (await reader.ReadAsync(cancellationToken)) {
                           // Grab a stream to the binary data in the source database.
                           using (Stream dataStream = reader.GetStream(0)) {

                              // Set the parameter value to the stream source that was opened.
                              streamParameter.Value = dataStream;

                              // Asynchronously send data from one database to another.
                              await writeCmd.ExecuteNonQueryAsync(cancellationToken);
                           }
                        }
                     }
                  }
               }
            }
         }
      }
   }
}

См. также