THE SQL Server Blog Spot on the Web

Welcome to SQLblog.com - The SQL Server blog spot on the web Sign in | |
in Search

Alexander Kuznetsov

Learning PostgreSql: bulk loading data

In this post we shall start loading data in bulk.

For better performance of inserts, we shall load data into a table without constraints and indexes. This sounds familiar. There is a bulk copy utility, and it is very easy to invoke from C#. The following code feeds the output from a T-SQL stored procedure into a PostgreSql table:

using (var pgTableTarget = new PgTableTarget(PgConnString, "Data.MyPgTable", GetColumns()))
using (var conn = new SqlConnection(connectionString))
{
   conn.Open
();
   
using (var command = conn.CreateCommand())
   
{
       command.CommandText 
= "EXEC MyStoredProc";
       
command.CommandType = CommandType.Text;
       
command.CommandTimeout = 0;
       
using (var dr = command.ExecuteReader())
       
{
           var columnTypes 
= SetFields(dr.GetSchemaTable());
           
var adapter = new SqlDataReaderToPgTableAdapter(pgTableTarget);
           
while (dr.Read())
           
{
               
for (var columnIndex = 0; columnIndex < columnTypes.Count; columnIndex++)
               
{
                   adapter.AddValue
(dataReader: dr, columnIndex: columnIndex, columnType: columnTypes[columnIndex]);
               
}
               pgTableTarget.EndRow
();
           
}
       }
   }
}

(snip)

private const int TYPE = 24;

internal static List<string> SetFields(DataTable schema)
{
   
return (from DataRow dataRow in schema.Rows select dataRow[TYPE].ToString().ToLower()).ToList();
}
 

This code uses a couple of helper classes we developed ourselves. Here is a simple wrapper around COPY utility:

using System;
using System.Collections.Generic;
using System.Data;
using Npgsql;

namespace Drw.Qr.FinDb.PostgresDataLoad
{
    
public class PgTableTarget : IDisposable
    {
        
private readonly Npgsql.NpgsqlConnection _conn;
        
private readonly NpgsqlCommand _command;
        
private readonly NpgsqlCopySerializer _serializer;
        
private readonly NpgsqlCopyIn _copyIn;

        
public PgTableTarget(string connString, string tableName, IEnumerable<string> columns)
        
{
            _conn 
= new NpgsqlConnection(connString);
            
_conn.Open();
            
_command = _conn.CreateCommand();
            
var copyStr = string.Format("COPY {0}({1}) FROM STDIN", tableName, string.Join(",", columns));
            
_command.CommandText = copyStr;
            
_command.CommandType = CommandType.Text;
            
_serializer = new NpgsqlCopySerializer(_conn);
            
_copyIn = new NpgsqlCopyIn(_command, _conn, _serializer.ToStream);
            
_copyIn.Start();
        
}

        
public void AddString(string value)
        
{
            _serializer.AddString
(value);
        
}

        
public void AddNull()
        
{
            _serializer.AddNull
();
        
}

        
public void AddInt32(int value)
        
{
            _serializer.AddInt32
(value);
        
}

        
public void AddNumber(double value)
        
{
            _serializer.AddNumber
(value);
        
}

        
public void EndRow()
        
{
            _serializer.EndRow
();
            
_serializer.Flush();
        
}

        
public void Dispose()
        
{
            _copyIn.End
();
            
_serializer.Flush();
            
_serializer.Close();
            
_command.Dispose();
            
_conn.Dispose();
        
}
    }
}
  

The following code is a straightforward adapter:


using System;
using System.Data.SqlClient;

namespace Drw.Qr.FinDb.PostgresDataLoad
{
    
public class SqlDataReaderToPgTableAdapter
    {
        
private readonly PgTableTarget _pgTableTarget;

        
public SqlDataReaderToPgTableAdapter(PgTableTarget pgTableTarget)
        
{
            _pgTableTarget 
= pgTableTarget;            
        
}

        
public void AddValue(SqlDataReader dataReader, 
                             
int columnIndex, string columnType)
        
{
            
if (dataReader.IsDBNull(columnIndex))
            
{
                _pgTableTarget.AddNull
();
                
return;
            
}
            
switch (columnType)
            
{
                
case "varchar":
                
case "char":
                    
_pgTableTarget.AddString(dataReader.GetString(columnIndex));
                    
break;
                
case "decimal":
                    
_pgTableTarget.AddNumber((double)dataReader.GetDecimal(columnIndex));
                    
break;
                
case "int":
                    
_pgTableTarget.AddInt32(dataReader.GetInt32(columnIndex));
                    
break;
                
case "smallint":
                    
_pgTableTarget.AddInt32(dataReader.GetInt16(columnIndex));
                    
break;
                
case "real":
                    
_pgTableTarget.AddNumber(dataReader.GetFloat(columnIndex));
                    
break;
                
case "float":
                    
_pgTableTarget.AddNumber(dataReader.GetDouble(columnIndex));
                    
break;
                
default:
                    
throw new ArgumentException("Not supported type: " + columnType);
            
}            
        }
    }
}
  

Although both classes do not support all the available types, they do support all the types we need for this small project.

As I was typing this post, the code already moved over several million rows.

Published Monday, November 04, 2013 11:13 AM by Alexander Kuznetsov

Comment Notification

If you would like to receive an email when updates are made to this post, please register here

Subscribe to this post's comments using RSS

Comments

 

Miki said:

First of all thanks for this great piece of work. I am trying to achieve similar goal and this snippet really helps. To have snippets in ready to run form, do you have them hosted in any online repositories?

- Miki Oracle

January 3, 2014 6:09 AM
 

AlexK said:

Miki,

I am glad you liked this. The code in this post is actually working in production. I did not upload it on github yet.

January 3, 2014 8:16 PM
 

Miki said:

Great, Is it possible for me to receive the complete VSProj in my email, oracle(AT)linuxwaves(DOT)com Since I have some difficulties in compiling this Snippet.

January 6, 2014 4:55 AM

Leave a Comment

(required) 
(required) 
Submit

About Alexander Kuznetsov

Alex Kuznetsov has been working with object oriented languages, mostly C# and C++, as well as with databases for more than a decade. He has worked with Sybase, SQL Server, Oracle and DB2. He regularly blogs on sqlblog.com, mostly about database unit testing, defensive programming, and query optimization. Alex has written a book entitled "Defensive Database Programming with Transact-SQL" and several articles on simple-talk.com and devx.com. Currently he works as an agile developer.

This Blog

Syndication

Powered by Community Server (Commercial Edition), by Telligent Systems
  Privacy Statement