xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

SqlBulkCopy + IDataReader を利用した IEnumerable<T> の高効率なバルク挿入

C# / .NET + SQL Server 環境において Bulk Insert をするのはなかなかお手間です。それもこれも SqlBulkCopy という専用クラスがなかなか曲者なためなのですが。ただただ IEnumerable<T> のようなコレクションを挿入するのにひと工夫というか、ひと手間必要なのが心理的にハードルが高い。初見殺し!

僕自身、過去にやっていたのは DataTable を利用した方法でした。以下の記事に良い例が載っているので詳細はそちらに譲りますが、DataTable のインスタンスを作るにあたって一度データのコピーが必要になる上に、要素数が多いと内部の動的配列の拡張が何度も発生して遅くなってしまいます。折角パフォーマンスよくデータ挿入ができる SqlBulkCopy なのに、その事前準備で遅くなってどうするんだ!ということでなんとかしたい。

IDataReader を独自実装

SqlBulkCopy はそのデータソースとして IDataReader をとることができます。これを実装しさえすれば任意のデータソースと繋ぎ込めるというわけですね。ということで、今回は IEnumerable<T> なコレクションを IDataReader 経由で投げ込むことを考えてみます。

IDataReader に求められる実装

IDataReader を実装しようとすると、とんでもなくたくさんのプロパティとメソッドの実装を要求されます。ブログに書くには長過ぎるので折り畳んでおきますが、正直これを見たら卒倒するか吐くかしますね、普通。

IDataReader が要求する実装

public class DummyDataReader : IDataReader
{
    #region IDataReader implementations
    /// <inheritdoc/>
    public object this[int i]
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public object this[string name]
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public int Depth
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public bool IsClosed
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public int RecordsAffected
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public int FieldCount
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public void Close()
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public void Dispose()
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public bool GetBoolean(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public byte GetByte(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public long GetBytes(int i, long fieldOffset, byte[]? buffer, int bufferoffset, int length)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public char GetChar(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public long GetChars(int i, long fieldoffset, char[]? buffer, int bufferoffset, int length)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public IDataReader GetData(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public string GetDataTypeName(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public DateTime GetDateTime(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public decimal GetDecimal(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public double GetDouble(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    [return: DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicFields | DynamicallyAccessedMemberTypes.PublicProperties)]
    public Type GetFieldType(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public float GetFloat(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public Guid GetGuid(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public short GetInt16(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public int GetInt32(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public long GetInt64(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public string GetName(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public int GetOrdinal(string name)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public DataTable? GetSchemaTable()
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public string GetString(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public object GetValue(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public int GetValues(object[] values)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public bool IsDBNull(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public bool NextResult()
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public bool Read()
        => throw new NotImplementedException();
    #endregion
}

しかし、実はこれ全部を実装する必要はありません。SqlBulkCopy の内部実装を追いかけたりしたところ、実際に必要となるのは以下の 3 つのメソッド / プロパティだけです。

  • int FieldCount
  • object GetValue(int i)
  • bool Read()

他は全て NotImplementedException のままでも大丈夫です。なんだよー、これなら全然行けるじゃん...!

public class SqlBulkCopyDataReader : IDataReader
{
    #region IDataReader implementations
    /// <inheritdoc/>
    public int FieldCount
        => 0;


    /// <inheritdoc/>
    public object GetValue(int i)
        => null;


    /// <inheritdoc/>
    public bool Read()
        => true;
    #endregion
}

ざっくりと実装してみる

ということで実装していきましょう。いろいろ端折ってますが、概ね以下のような感じになります。

public class SqlBulkCopyDataReader<T> : IDataReader
{
    private IEnumerator<T> DataEnumerator { get; }

    public SqlBulkCopyDataReader(IEnumerator<T> enumerator)
        => this.DataEnumerator = enumerator;

    public SqlBulkCopyDataReader(IEnumerable<T> data)
        : this(data.GetEnumerator())
    { }

    #region IDataReader implementations
    public int FieldCount
        => PropertyInfoCache<T>.Instances.Length;

    public void Dispose()
        => this.DataEnumerator.Dispose();

    public object GetValue(int i)
    {
        // 対象テーブルの列とプロパティの個数 / 並び順が一致している前提
        var prop = PropertyInfoCache<T>.Instances[i];
        var obj = this.DataEnumerator.Current;
        return prop.GetValue(obj)!;
    }

    public bool Read()
        => this.DataEnumerator.MoveNext();
    #endregion

    private static class PropertyInfoCache<U>
    {
        public static PropertyInfo[] Instances { get; }

        static PropertyInfoCache()
            => Instances = typeof(U).GetProperties(BindingFlags.Instance | BindingFlags.Public);
    }
}

IEnumerator<T>.Dispose() を呼び出すために Dispose() メソッドも追加になっていますが、それでもメチャ短い!

SqlBulkCopy に食わせてみる

下準備が整ったので実際に SqlBulkCopy と組み合わせて使ってみましょう。折角なので SqlConnection の拡張メソッドでも作ってみます。実際にはまだもうちょっと考慮すべき点がありそうですが、サンプルとしてはだいぶいい感じになっていると思います。

public static class SqlConnectionExtensions
{
    public static async ValueTask<int> BulkInsertAsync<T>(this SqlConnection connection, IEnumerable<T> data, SqlBulkCopyOptions options = default, int? timeout = null, CancellationToken cancellationToken = default)
    {
        using (var executor = new SqlBulkCopy(connection, options, null))
        {
            // テーブル名と型名が一致しているとする
            executor.DestinationTableName = typeof(T).Name;

            // タイムアウトを指定できるようにしておくと優しそう
            executor.BulkCopyTimeout = timeout ?? executor.BulkCopyTimeout;

            // データを流し込む
            using (var reader = new SqlBulkCopyDataReader<T>(data))
                await executor.WriteToServerAsync(reader, cancellationToken);

            // 影響した行数 (= 流し込んだ件数) を返すのが一般的
            return executor.RowsCopied;
        }
    }
}

まとめ

ずっと IDataReader の実装が嫌過ぎて、単純な食わず嫌いで DataTable に甘んじてましたが、気が向いて SqlBulkCopy の内部実装を追いかけてみたら実は全然大した実装が必要ないことを知ってぴえん。10 年くらい無駄なことをしてたと思うとだいぶ勿体ないことをしてました。反省。