One Way to Create a Parquet File from SQL Server Data

Yesterday I was coding away, deep into using C# to decompress and parse Excel XML so I could read data from a specific worksheet in a workbook on a server without Excel objects installed, when I saw the following on Twitter:

For those of you who do not already know…

I Don’t Switch Contexts Well

I wasn’t wrong.
I just wasn’t thinking… yet.

Plus (blogger tip), if you want your blog to help people, be on the lookout for statements like, “I can’t find this anywhere…” When you find one, blog about that topic.

I searched and found some promising Parquet SSIS components available from CData Software and passed that information along. I shared my inexperience in exporting to parquet format and asked a few friends how they’d done it.

I thought: How many times have I demonstrated Azure Data Factory and clicked right past file format selection without giving Parquet a second thought? Too many times. It was time to change that.

The request in the tweet is threefold:

  1. Read data from SQL Server.
  2. Write the data to a Parquet file.
  3. Stay on-premises.

Steps 1 and 2

Steps 1 and 2 can be accomplished in Azure Data Factory without much hassle. The steps are:

Create a pipeline:

Add a Copy Data activity:

Configure a SQL Server data source:

Configure a Parquet sink:

I used a storage account configured for Azure Data Lake as my target, and you can find details on configuring ADLS and using Parquet with ADF at Microsoft docs.

Step 3

Staying on-premises was the tricksy part of this request. I decided to create a C# (.Net Framework) console application to test the concepts. I also decided to use Parquet.Net:

I found some helpful C# on Darren Fuller‘s post titled Extracting data from SQL into Parquet. I modified Darren’s code some and was able to export a dataset from the AdventureWorks2019.Person.Person database to a Parquet file using the following code:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Parquet;
using Parquet.Data;
using DataColumn = System.Data.DataColumn;
using DBNull = System.DBNull;

namespace ExportToParquetConsole
{
public class Program
{
static void Main(string[] args)
{
//WriteTestFile();
DataTable dt = GetData();
var fields = GenerateSchema(dt);
const int RowGroupSize = 100;
const string OutputFilePath = “e:\\Andy\\data\\person.parquet”;

// Open the output file for writing
using (var stream = File.Open(OutputFilePath, FileMode.Create, FileAccess.Write))
{
using (var writer = new ParquetWriter(new Schema(fields), stream))
{
var startRow = 0;

// Keep on creating row groups until we run out of data
while (startRow < dt.Rows.Count)
{
using (var rgw = writer.CreateRowGroup())
{
// Data is written to the row group column by column
for (var i = 0; i < dt.Columns.Count; i++)
{
var columnIndex = i;

// Determine the target data type for the column
var targetType = dt.Columns[columnIndex].DataType;
if (targetType == typeof(DateTime)) targetType = typeof(DateTimeOffset);

// Generate the value type, this is to ensure it can handle null values
var valueType = targetType.IsClass
? targetType
: typeof(Nullable<>).MakeGenericType(targetType);

// Create a list to hold values of the required type for the column
var list = (IList)typeof(List<>)
.MakeGenericType(valueType)
.GetConstructor(Type.EmptyTypes)
.Invoke(null);

// Get the data to be written to the parquet stream
foreach (var row in dt.AsEnumerable().Skip(startRow).Take(RowGroupSize))
{
// Check if value is null, if so then add a null value
if (row[columnIndex] == null || row[columnIndex] == DBNull.Value)
{
list.Add(null);
}
else
{
// Add the value to the list, but if it’s a DateTime then create it as a DateTimeOffset first
list.Add(dt.Columns[columnIndex].DataType == typeof(DateTime)
? new DateTimeOffset((DateTime)row[columnIndex])
: row[columnIndex]);
}
}

// Copy the list values to an array of the same type as the WriteColumn method expects
// and Array
var valuesArray = Array.CreateInstance(valueType, list.Count);
list.CopyTo(valuesArray, 0);

// Write the column
rgw.WriteColumn(new Parquet.Data.DataColumn(fields[i], valuesArray));
}
}

startRow += RowGroupSize;
}
}
}
}

public static DataTable GetData()
{
string connectionString = “Data Source=vDemo19\\Demo;Initial Catalog=AdventureWorks2019;Integrated Security=True;”;
SqlConnection conn = new SqlConnection(connectionString);
conn.Open();
string query = “Select BusinessEntityID , FirstName, MiddleName, LastName From Person.Person”;
SqlCommand cmd = new SqlCommand(query, conn);

DataTable dt = new DataTable();
dt.Load(cmd.ExecuteReader());
conn.Close();
return dt;
}

private static List<DataField> GenerateSchema(DataTable dt)
{
var fields = new List<DataField>(dt.Columns.Count);

foreach (DataColumn column in dt.Columns)
{
// Attempt to parse the type of column to a parquet data type
var success = Enum.TryParse<DataType>(column.DataType.Name, true, out var type);

// If the parse was not successful and it’s source is a DateTime then use a DateTimeOffset, otherwise default to a string
if (!success && column.DataType == typeof(DateTime))
{
type = DataType.DateTimeOffset;
}
else if (!success)
{
type = DataType.String;
}

fields.Add(new DataField(column.ColumnName, type));
}

return fields;
}

static void WriteTestFile()
{
//create data columns with schema metadata and the data you need
var parColBusinessEntityID = new Parquet.Data.DataColumn(
new DataField<int>(“BusinessEntityID”),
new int[] { 1, 2 });

var parColLastName = new Parquet.Data.DataColumn(
new DataField<string>(“LastName”),
new string[] { “Sánchez”, “Duffy” });

var parColFirstName = new Parquet.Data.DataColumn(
new DataField<string>(“FirstName”),
new string[] { “Ken”, “Terri” });

var parColMiddleName = new Parquet.Data.DataColumn(
new DataField<string>(“MiddleName”),
new string[] { “J”, “Lee” });

// create file schema
var schema = new Schema(parColBusinessEntityID.Field
, parColFirstName.Field
, parColMiddleName.Field
, parColLastName.Field);

using (Stream fileStream = System.IO.File.Create(“e:\\Andy\\data\\persontest.parquet”))
{
using (var parquetWriter = new ParquetWriter(schema, fileStream))
{
// create a new row group in the file
using (ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup())
{
groupWriter.WriteColumn(parColBusinessEntityID);
groupWriter.WriteColumn(parColFirstName);
groupWriter.WriteColumn(parColMiddleName);
groupWriter.WriteColumn(parColLastName);
}
}
}
}
}
}

I apologize for the formatting. You should be able to copy and paste this code into a C# (.Net Framework) console app and have at it. The code can also be adapted for an SSIS Script Task.

Conclusion

I tested the output Parquet file by uploading the output file to Azure Blob Storage, creating a test target table, creating a test ADF pipeline, and then loading the data from the file into the table:

The load succeeded and a test query showed my data.

I hope this helps.

Peace.

 

Andy Leonard

andyleonard.blog

Christian, husband, dad, grandpa, Data Philosopher, Data Engineer, SSIS and Biml guy. I was cloud before cloud was cool. :{>

2 thoughts on “One Way to Create a Parquet File from SQL Server Data

  1. We have used ‘external tables’ aka Polybase to extract data into Parquet files on prem. Once external table (Parquet file) is defined is just a simple insert into .. lot less coding requried..

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.