Entity Framework 6 – Bulk Insert and Returning of Generated Primary Keys

Bit of a code dump today. I was tasked with making database insertions quicker. On a good day we have millions of rows which we inserted using the Entity Framework – which is inefficient and done one row at a time.

I’ve started by working with the library EFUtilities which looked very promising – it uses the Entity Frameworks’s metadata and model to create a temporary table, bulk inserts into it, and copies data to the real table. The following code expand the capabilities of the library:

  • Improved thread-safety.
  • Add mode of ignoring failures – Sometimes we want to insert millions of rows, and we don’t really care about a few invalid ones.
  • Main motivation – Return the generated primary keys (or IDs) of the inserted rows. EFUtilities leaved all IDs as 0, which is very limiting.


The main idea is that when the rows are inserted from the temporary table to the real table it is done by order, and the primary keys are returned using the OUTPUT Clause. I’m also using ORDER BY to ensure the rows are inserted in the right order, which isn’t true otherwise.

Note that at it is, the code only supports primary keys of type long (BigInt), but it should be straightforward to change it or make the code more generic.

Source Code

using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Data.Entity;
using System.Data.SqlClient;
using System.Linq;
using System.Linq.Expressions;
using EntityFramework.Utilities;
using log4net;

public interface IHasPrimaryKey
{
    long PrimaryKey { get; set; }

    //this is needed because EntityFramework.Utilities doesn't reveal this internal information to the SQL provider.
    // https://github.com/MikaelEliasson/EntityFramework.Utilities/blob/a5abc50b7367d64ca541b6e7e2e6018a500b6d8d/EntityFramework.Utilities/EntityFramework.Utilities/EFBatchOperation.cs#L119
    string PrimaryKeyPropertyName { get; }
}

public static class BulkOperationsExtensions
{
    static BulkOperationsExtensions()
    {
        SetProvider();
    }

    private static void SetProvider()
    {
        Configuration.Providers.Clear();
        Configuration.Providers.Add(new BulkInsertEntityFrameworkUtitlitiesSqlProvider());
    }

    private static readonly object SyncLock = new object();

    private static void SyncronizedInitContextMetadata(DbContext context)
    {
        lock (SyncLock)
        {
            //this method is not thread safe, so we lock it.
            EfMappingFactory.GetMappingsForContext(context);
        }
    }

    public static void InsertAll(this DbContext context, IDbSet dbSet, IList items,
        BulkFailurePolicy failurePolicy = BulkFailurePolicy.AllOrNothing)
        where TEntity : class
    {
        SyncronizedInitContextMetadata(context);
        var wrapper = new BulkInsertionCollectionMetadata(items) { SetGeneratedRecordIds = false, };
        RetryOnFailure(wrapper, data => EFBatchOperation.For(context, dbSet).InsertAll(data), failurePolicy);
    }

    public static void InsertAllAndReturnRecordIds(this DbContext context, IDbSet dbSet,
        IList items,
        BulkFailurePolicy failurePolicy = BulkFailurePolicy.AllOrNothing) where TEntity : class, IHasPrimaryKey
    {
        SyncronizedInitContextMetadata(context);
        var wrapper = new BulkInsertionCollectionMetadata(items) { SetGeneratedRecordIds = true, };
        RetryOnFailure(wrapper, data => EFBatchOperation.For(context, dbSet).InsertAll(data), failurePolicy);
    }

    private const int MaxRetries = 80;
    private const int MaxRowsPerInsertAction = 3000;

    private static void RetryOnFailure(BulkInsertionCollectionMetadata items,
        Action<BulkInsertionCollectionMetadata> insertAction, BulkFailurePolicy failurePolicy,
        int retryCounter = 0)
    {
        // If there are too many items, split the load to smaller items.
        // This is only allowed if we don't have to rollback after filures.
        if (failurePolicy == BulkFailurePolicy.IgnoreFailures && items.Count > MaxRowsPerInsertAction)
        {
            var split = items.Split();
            foreach (var smallerCollection in split)
            {
                RetryOnFailure(smallerCollection, insertAction, failurePolicy, retryCounter);
            }
            return;
        }
        try
        {
            insertAction(items);
        }
        //We are catching all exceptions. We can also get InvalidOperationException or others, not just SqlException.
        catch (Exception exception) when (exception.Message.Contains("deadlock"))
        {
            Log.Error($"Encountered deadlock while ingesting records of type {typeof(TEntity).Name} to database.", exception);

            //never ignore deadlock related failures, we will throw this exception.
            if (retryCounter > MaxRetries) throw;
            RetryOnFailure(items, insertAction, failurePolicy, retryCounter + 1);
        }
        //any other exception - split and retry
        catch (Exception exception)
        {
            if (items.Count == 1 && failurePolicy == BulkFailurePolicy.IgnoreFailures)
            {
                Log.Error($"Problem ingesting record of type {typeof(TEntity).Name} to database.", exception);
                return;
            }
            if (failurePolicy == BulkFailurePolicy.AllOrNothing)
            {
                Log.Error($"Problem ingesting records of type {typeof(TEntity).Name} to database.", exception);
                throw;
            }
            //else - split the list to two
            var split = items.Split();
            foreach (var smallerCollection in split)
            {
                RetryOnFailure(smallerCollection, insertAction, failurePolicy, retryCounter + 1);
            }
        }
    }

    private static readonly ILog Log = LogManager.GetLogger(typeof(BulkOperationsExtensions));
}

public enum BulkFailurePolicy
{
    /// <summary>
    /// The collection will be insertes as whole, or all rows will be rolled back and an exception will be thrown.
    /// </summary>
    AllOrNothing,
    /// <summary>
    /// On failure, the collection will be devided and retried, until individual rows fail.
    /// </summary>
    IgnoreFailures,
}
/// <summary>
/// This class is used because we cannot pass additional parameters to 
/// 
/// </summary>
internal class BulkInsertionCollectionMetadata : IList
{
    private readonly IList _items;

    public BulkInsertionCollectionMetadata(IList items)
    {
        _items = items;
    }

    public bool SetGeneratedRecordIds { get; set; }

    public IEnumerator GetEnumerator() =&gt; _items.GetEnumerator();

    IEnumerator IEnumerable.GetEnumerator() =&gt; ((IEnumerable)_items).GetEnumerator();

    public void Add(T item) =&gt; _items.Add(item);

    public void Clear() =&gt; _items.Clear();

    public bool Contains(T item) =&gt; _items.Contains(item);

    public void CopyTo(T[] array, int arrayIndex) =&gt; _items.CopyTo(array, arrayIndex);

    public bool Remove(T item) =&gt; _items.Remove(item);

    public int Count =&gt; _items.Count;

    public bool IsReadOnly =&gt; _items.IsReadOnly;

    public int IndexOf(T item) =&gt; _items.IndexOf(item);

    public void Insert(int index, T item) =&gt; _items.Insert(index, item);

    public void RemoveAt(int index) =&gt; _items.RemoveAt(index);

    public T this[int index]
    {
        get =&gt; _items[index];
        set =&gt; _items[index] = value;
    }

    public BulkInsertionCollectionMetadata[] Split()
    {
        var itemsMiddle = _items.Count / 2;
        return new[]
        {
            CopyWithOtherItems(_items.Take(itemsMiddle).ToList()),
            CopyWithOtherItems(_items.Skip(itemsMiddle).ToList()),
        };
    }

    private BulkInsertionCollectionMetadata CopyWithOtherItems(IList items)
        =&gt; new BulkInsertionCollectionMetadata(items)
        {
            SetGeneratedRecordIds = SetGeneratedRecordIds
        };
}
/// <summary>
/// Override provider to fix some bugs.
/// </summary>
/// 
/// https://github.com/MikaelEliasson/EntityFramework.Utilities/blob/a5abc50b7367d64ca541b6e7e2e6018a500b6d8d/EntityFramework.Utilities/EntityFramework.Utilities/SqlQueryProvider.cs
/// 
public class BulkInsertEntityFrameworkUtitlitiesSqlProvider : SqlQueryProvider, EntityFramework.Utilities.IQueryProvider
{
    public new void InsertItems(IEnumerable items, string schema, string tableName, IList properties, DbConnection storeConnection, int? batchSize)
    {
        var con = (SqlConnection)storeConnection;
        var wrapperData = items as BulkInsertionCollectionMetadata;
        bool shouldReturnRecordIds = wrapperData?.SetGeneratedRecordIds == true;
        if (shouldReturnRecordIds)
        {
            BulkInsertAllAndReturnIds(wrapperData, schema, tableName, properties, con, batchSize);
        }
        else
        {
            BulkInsertAll(items, schema, tableName, properties, con, batchSize, SqlBulkCopyOptions.KeepNulls);
        }
    }

    private static void BulkInsertAll(IEnumerable items, string schema, string tableName, IList properties,
        SqlConnection connection, int? batchSize, SqlBulkCopyOptions insertOptions)
    {
        using (var reader = new EFDataReader(items, properties))
        {
            if (connection.State != ConnectionState.Open)
            {
                connection.Open();
            }
            using (SqlBulkCopy copy = new SqlBulkCopy(connection.ConnectionString, insertOptions))
            {
                copy.BulkCopyTimeout = 0;
                copy.BatchSize = Math.Min(reader.RecordsAffected, batchSize ?? 15000); //default batch size
                if (tableName.StartsWith("#"))
                {
                    copy.DestinationTableName = tableName;
                }
                else if (!string.IsNullOrWhiteSpace(schema))
                {
                    copy.DestinationTableName = $"[{schema}].[{tableName}]";
                }
                else
                {
                    copy.DestinationTableName = "[" + tableName + "]";
                }

                copy.NotifyAfter = 0;

                foreach (var i in Enumerable.Range(0, reader.FieldCount))
                {
                    copy.ColumnMappings.Add(i, properties[i].NameInDatabase);
                }
                copy.WriteToServer(reader);
                copy.Close();
            }
        }
    }

    private static void BulkInsertAllAndReturnIds(BulkInsertionCollectionMetadata items, string schema,
        string tableName,
        IList properties, SqlConnection connection, int? batchSize)
    {
        if (items.Count == 0) return;
        long dummyValue = -1000 - items.Count;
        //set dummy IDs
        foreach (var item in items)
        {
            ((IHasPrimaryKey)item).PrimaryKey = dummyValue;
            dummyValue++;
        }
        try
        {
            if (connection.State != ConnectionState.Open)
            {
                connection.Open();
            }

            //create dummy table.
            using (var tempTable = new TempTable(connection, tableName, schema))
            {
                var createTempTableSql = $"Select * Into {tempTable.TableName} From {tableName} Where 1 = 2";
                using (var command = new SqlCommand(createTempTableSql, connection))
                {
                    command.ExecuteNonQuery();
                }

                //bulk insert to temp table.
                BulkInsertAll(items, schema, tempTable.TableName, properties, connection, batchSize,
                    SqlBulkCopyOptions.KeepNulls | SqlBulkCopyOptions.KeepIdentity);

                //note: IsPrimaryKey is not populated in InsertAll 
                // https://github.com/MikaelEliasson/EntityFramework.Utilities/blob/a5abc50b7367d64ca541b6e7e2e6018a500b6d8d/EntityFramework.Utilities/EntityFramework.Utilities/EFBatchOperation.cs#L129

                string primaryKeyNameOnObject = ((IHasPrimaryKey)items.First()).PrimaryKeyPropertyName;
                var primaryKey = properties.Single(c =&gt; c.NameOnObject == primaryKeyNameOnObject);
                var otherColumns = properties.Where(p =&gt; p != primaryKey);
                var allValueColumns = String.Join(", ", otherColumns.Select(c =&gt; "[" + c.NameInDatabase + "]"));

                //insert to real table and get new IDs.
                //this guarantees the record IDs are generated in the right order.
                var migrateAndReturnIds =
                    $@"
                    insert into {tableName} ({allValueColumns})
                    OUTPUT inserted.{primaryKey.NameInDatabase}
                    select {allValueColumns} from {tempTable.TableName} temp
                    order by temp.{primaryKey.NameInDatabase}
                    ";

                var newlyGeneratedIds = new List(items.Count);
                using (var migrateDataCommand = new SqlCommand(migrateAndReturnIds, connection)
                {
                    CommandTimeout = 0
                })
                using (var recordIdReader = migrateDataCommand.ExecuteReader())
                {
                    while (recordIdReader.Read())
                    {
                        var newId = recordIdReader.GetInt64(0);
                        newlyGeneratedIds.Add(newId);
                    }
                }
                //set IDs on entities.
                if (newlyGeneratedIds.Count != items.Count)
                {
                    throw new MissingPrimaryKeyException("There are fewer generated record IDs than the " +
                                                            "number of items inserted to the database.");
                }
                //the order of the IDs is not guaranteed, but the values will be generated in the same as the order values in `items`
                newlyGeneratedIds.Sort();
                for (int i = 0; i &lt; newlyGeneratedIds.Count; i++)
                {
                    ((IHasPrimaryKey)items[i]).PrimaryKey = newlyGeneratedIds[i];
                }
            }
        }
        finally
        {
            //make sure the ID is 0 if the row wasn&#039;t inserted.
            foreach (var item in items)
            {
                var entity = (IHasPrimaryKey)item;
                if (entity.PrimaryKey &lt; 0) entity.PrimaryKey = 0;
            }
        }
    }

    private static string GetPropertyName(Expression&lt;Func&gt; propertyLambda)
    {
        var temp = propertyLambda.Body;
        while (temp is UnaryExpression)
        {
            temp = (temp as UnaryExpression).Operand;
        }
        MemberExpression member = temp as MemberExpression;
        return member?.Member.Name;
    }

    public new void UpdateItems(IEnumerable items, string schema, string tableName,
        IList properties, DbConnection storeConnection, int? batchSize,
        UpdateSpecification updateSpecification)
    {
        var columnsToUpdate = updateSpecification.Properties.Select(GetPropertyName).ToDictionary(x =&gt; x);
        var filtered = properties.Where(p =&gt; columnsToUpdate.ContainsKey(p.NameOnObject) || p.IsPrimaryKey).ToList();
        var columns = filtered.Select(c =&gt; "[" + c.NameInDatabase + "] " + c.DataType);
        var pkConstraint = string.Join(", ",
            properties.Where(p =&gt; p.IsPrimaryKey).Select(c =&gt; "[" + c.NameInDatabase + "]"));

        var con = (SqlConnection)storeConnection;
        if (con.State != ConnectionState.Open)
        {
            con.Open();
        }
        var tempTable = new TempTable(con, tableName, schema);
        var createTempTableSql =
            $"CREATE TABLE {schema}.[{tempTable.TableName}]({string.Join(", ", columns)}, PRIMARY KEY ({pkConstraint}))";

        var setters = string.Join(",",
            filtered.Where(c =&gt; !c.IsPrimaryKey)
                .Select(c =&gt; "[" + c.NameInDatabase + "] = TEMP.[" + c.NameInDatabase + "]"));
        var pks =
            properties.Where(p =&gt; p.IsPrimaryKey)
                .Select(x =&gt; "ORIG.[" + x.NameInDatabase + "] = TEMP.[" + x.NameInDatabase + "]");
        var filter = string.Join(" and ", pks);
        var mergeCommand = string.Format(@"UPDATE [{0}]
            SET
                {3}
            FROM
                [{0}] ORIG
            INNER JOIN
                    [{1}] TEMP
            ON 
                {2}", tableName, tempTable.TableName, filter, setters);

        using (var createCommand = new SqlCommand(createTempTableSql, con))
        using (var mCommand = new SqlCommand(mergeCommand, con))
        using (tempTable)
        {
            createCommand.ExecuteNonQuery();
            InsertItems(items, schema, tempTable.TableName, filtered, storeConnection, batchSize);
            mCommand.ExecuteNonQuery();
        }
    }

    private static string GetTempTableName(string baseTableName)
        //consider using a temp table: "#". This doesn't work with bulk insert.
        =&gt; "temp_" + baseTableName + "_" + DateTime.UtcNow.ToString("yyyyMMdd_HHmmss_fff") + "_" +
            Guid.NewGuid().ToString("N");

    private class TempTable : IDisposable
    {
        private readonly SqlConnection _connection;
        private readonly string _dbSchema;
        public string TableName { get; }

        public TempTable(SqlConnection connection, string baseTableName, string dbSchema)
        {
            _connection = connection;
            _dbSchema = dbSchema;
            TableName = GetTempTableName(baseTableName);
        }

        public void Dispose()
        {
            try
            {
                using (var dCommand = new SqlCommand($"DROP table {_dbSchema}.[{TableName}]", _connection))
                {
                    dCommand.ExecuteNonQuery();
                }
            }
            catch
            {
                //Nothing to do. don't care.
            }
        }
    }
}

Usage

Your entity should implement the interface IHasPrimaryKey. Suppose you have this class:

public partial class Spaceship
{
    public virtual long SpaceshipId { get; set; }
}

I recommend using explcit interface implementation, to keep these properties hidden while writing queries.

partial class Spaceship : IHasPrimaryKey
{
	long IHasPrimaryKey.PrimaryKey
    {
    	get => SpaceshipId;
        set => SpaceshipId = value;
    }
    
    string IHasPrimaryKey.PrimaryKeyPropertyName
    	=> nameof(SpaceshipId);
}

and then you can use:

context.InsertAllAndReturnRecordIds(context.Spaceships, 
    purchasedSpaceships, BulkFailurePolicy.IgnoreFailures);

Results

The new code is significantly faster than native Entity Framework inserts, with up to 93% reduction in time:
SqlBulkInsertTimes

5 thoughts on “Entity Framework 6 – Bulk Insert and Returning of Generated Primary Keys

  1. Hi, good day.
    “Your entity should implement the interface IHasPrimaryKey”…how to do this? can you give example? My Entities are DB-first generated.
    e.g. I implemented this into a new class derived from my entity DataHistory like so:

    
    public class DataHistoryPKExtension 
                DataHistory, EntityFramework.Utilities.IHasPrimaryKey
    {
        private long _pkey = 0;
        public long PrimaryKey
        {
            get
            {
                return _pkey;
            }
            set
            {
                _pkey = value;
            }
        }
    
        public string PrimaryKeyPropertyName
        {
            get
            {
                return "ID";
            }
        }
    
        public DataHistoryPKExtension(DataHistory element)
        {
            _pkey = element.ID;
        }
    }
    

    and then how do I put this back into the context? does set.attach() work n this? Thanks for any info you may have

    • Hey!
      I’ve updated the post with a complete example. The best way is to use partial classes, and keep interface implementation separated from the model and mapping.
      As for attach – it should work. I never properly tried it. I like to have my Entity Framework contexts open for a very short time, just for a single operation, and open a new context for the next.
      In general, the Bulk Utilities methods use Entity Framework for metadata, but doesn’t play too nicely with attach or change-tracking, so I would expect at lease some friction there.

      • 😀 thanks for the prompt reply sir. it turns out I didn’t need attach() since this is insert there is nothing there to begin with ;P sorry, my mistake. I have made it to work on my ef project but not using partial classes — though, I will take note on your suggestion.
        I will try to update the part where this only uses long as primary key…because I have tables with guid as primary keys (should be flexible for other types but would require reflection, I guess). For now, this is great as it is, thanks.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.