THE SQL Server Blog Spot on the Web

Welcome to SQLblog.com - The SQL Server blog spot on the web Sign in | |
in Search

Alexander Kuznetsov

Learning PostgreSql: bulk loading data

In this post we shall start loading data in bulk.

For better performance of inserts, we shall load data into a table without constraints and indexes. This sounds familiar. There is a bulk copy utility, and it is very easy to invoke from C#. The following code feeds the output from a T-SQL stored procedure into a PostgreSql table:

using (var pgTableTarget = new PgTableTarget(PgConnString, "Data.MyPgTable", GetColumns()))
using (var conn = new SqlConnection(connectionString))
{
   conn.Open
();
   
using (var command = conn.CreateCommand())
   
{
       command.CommandText 
= "EXEC MyStoredProc";
       
command.CommandType = CommandType.Text;
       
command.CommandTimeout = 0;
       
using (var dr = command.ExecuteReader())
       
{
           var columnTypes 
= SetFields(dr.GetSchemaTable());
           
var adapter = new SqlDataReaderToPgTableAdapter(pgTableTarget);
           
while (dr.Read())
           
{
               
for (var columnIndex = 0; columnIndex < columnTypes.Count; columnIndex++)
               
{
                   adapter.AddValue
(dataReader: dr, columnIndex: columnIndex, columnType: columnTypes[columnIndex]);
               
}
               pgTableTarget.EndRow
();
           
}
       }
   }
}

(snip)

private const int TYPE = 24;

internal static List<string> SetFields(DataTable schema)
{
   
return (from DataRow dataRow in schema.Rows select dataRow[TYPE].ToString().ToLower()).ToList();
}
 

This code uses a couple of helper classes we developed ourselves. Here is a simple wrapper around COPY utility:

using System;
using System.Collections.Generic;
using System.Data;
using Npgsql;

namespace Drw.Qr.FinDb.PostgresDataLoad
{
    
public class PgTableTarget : IDisposable
    {
        
private readonly Npgsql.NpgsqlConnection _conn;
        
private readonly NpgsqlCommand _command;
        
private readonly NpgsqlCopySerializer _serializer;
        
private readonly NpgsqlCopyIn _copyIn;

        
public PgTableTarget(string connString, string tableName, IEnumerable<string> columns)
        
{
            _conn 
= new NpgsqlConnection(connString);
            
_conn.Open();
            
_command = _conn.CreateCommand();
            
var copyStr = string.Format("COPY {0}({1}) FROM STDIN", tableName, string.Join(",", columns));
            
_command.CommandText = copyStr;
            
_command.CommandType = CommandType.Text;
            
_serializer = new NpgsqlCopySerializer(_conn);
            
_copyIn = new NpgsqlCopyIn(_command, _conn, _serializer.ToStream);
            
_copyIn.Start();
        
}

        
public void AddString(string value)
        
{
            _serializer.AddString
(value);
        
}

        
public void AddNull()
        
{
            _serializer.AddNull
();
        
}

        
public void AddInt32(int value)
        
{
            _serializer.AddInt32
(value);
        
}

        
public void AddNumber(double value)
        
{
            _serializer.AddNumber
(value);
        
}

        
public void EndRow()
        
{
            _serializer.EndRow
();
            
_serializer.Flush();
        
}

        
public void Dispose()
        
{
            _copyIn.End
();
            
_serializer.Flush();
            
_serializer.Close();
            
_command.Dispose();
            
_conn.Dispose();
        
}
    }
}
  

The following code is a straightforward adapter:


using System;
using System.Data.SqlClient;

namespace Drw.Qr.FinDb.PostgresDataLoad
{
    
public class SqlDataReaderToPgTableAdapter
    {
        
private readonly PgTableTarget _pgTableTarget;

        
public SqlDataReaderToPgTableAdapter(PgTableTarget pgTableTarget)
        
{
            _pgTableTarget 
= pgTableTarget;            
        
}

        
public void AddValue(SqlDataReader dataReader, 
                             
int columnIndex, string columnType)
        
{
            
if (dataReader.IsDBNull(columnIndex))
            
{
                _pgTableTarget.AddNull
();
                
return;
            
}
            
switch (columnType)
            
{
                
case "varchar":
                
case "char":
                    
_pgTableTarget.AddString(dataReader.GetString(columnIndex));
                    
break;
                
case "decimal":
                    
_pgTableTarget.AddNumber((double)dataReader.GetDecimal(columnIndex));
                    
break;
                
case "int":
                    
_pgTableTarget.AddInt32(dataReader.GetInt32(columnIndex));
                    
break;
                
case "smallint":
                    
_pgTableTarget.AddInt32(dataReader.GetInt16(columnIndex));
                    
break;
                
case "real":
                    
_pgTableTarget.AddNumber(dataReader.GetFloat(columnIndex));
                    
break;
                
case "float":
                    
_pgTableTarget.AddNumber(dataReader.GetDouble(columnIndex));
                    
break;
                
default:
                    
throw new ArgumentException("Not supported type: " + columnType);
            
}            
        }
    }
}
  

Although both classes do not support all the available types, they do support all the types we need for this small project.

As I was typing this post, the code already moved over several million rows.

Published Monday, November 4, 2013 11:13 AM by Alexander Kuznetsov

Comments

 

Miki said:

First of all thanks for this great piece of work. I am trying to achieve similar goal and this snippet really helps. To have snippets in ready to run form, do you have them hosted in any online repositories?

- Miki Oracle

January 3, 2014 6:09 AM
 

AlexK said:

Miki,

I am glad you liked this. The code in this post is actually working in production. I did not upload it on github yet.

January 3, 2014 8:16 PM
 

Miki said:

Great, Is it possible for me to receive the complete VSProj in my email, oracle(AT)linuxwaves(DOT)com Since I have some difficulties in compiling this Snippet.

January 6, 2014 4:55 AM
 

sebastian said:

hello i have a problem

ERROR: no existe el esquema «Data»

SQL state: 3F000

May 26, 2015 10:10 AM
 

Allan Munro said:

Thanks it worked perfectly. :)

June 4, 2015 1:12 AM
New Comments to this post are disabled

About Alexander Kuznetsov

Alex Kuznetsov has been working with object oriented languages, mostly C# and C++, as well as with databases for more than a decade. He has worked with Sybase, SQL Server, Oracle and DB2. He regularly blogs on sqlblog.com, mostly about database unit testing, defensive programming, and query optimization. Alex has written a book entitled "Defensive Database Programming with Transact-SQL" and several articles on simple-talk.com and devx.com. Currently he works as an agile developer.

This Blog

Syndication

Privacy Statement