Hi,
Recently I've been tasked with creating an automated ETL process that pumps the data into tables based on the flat file name by reading a master mapping file. I've decided to go with SqlBulkCopy and everything seemed to be fine. IDataReader interface was implemented to read flat-files, SQL Server's meta-data provided with number of columns for a one-to-one data mapping, everything was working until I ran across the file that carried empty strings. SqlBulkCopy throws an Exception saying that "The given value of type String from the data source cannot be converted to type int of the specified target column.". End of story, it does not even care that the DB type for this column is INT NULL. I know that I can interpret meta-data further, extract data types for given columns, build a DataSet based on extracted information, re-cast the data from flat-files and get myself a nice strongly typed solution that will work, but I am a lazy guy who feels like his happiness was viciously lacerated by Microsoft, or my own incompetence if someone knows of a solution to my sudden problem. Thank you for your time.
List<String> fileNames;
DateTime startJobTime = DateTime.Now;
Console.WriteLine("---------------------------------------------");
Console.WriteLine("Start Time: " + startJobTime);
Console.WriteLine("---------------------------------------------");
using (SqlConnection sqlCon = new SqlConnection(sqlConnection))
{
try
{
sqlCon.Open();
sqlCon.ChangeDatabase(edwDBName);
// Get service information for staging job
UnivStage us = GetStagingJobInfo(jobName, sqlCon);
us.StartJobTime = startJobTime;
// Get a list of file names
fileNames = GetFileList(us, args);
if (fileNames.Count > 0)
{
// Truncate Staging Table
TruncateStagingTable(us, sqlCon);
// Close and dispose of sqlCon2 connection
sqlCon.Close();
Console.WriteLine("Processing files: ");
foreach (String fileName in fileNames)
Console.WriteLine(fileName);
Console.WriteLine();
}
else
{
Console.WriteLine("No files to process.");
Environment.Exit(0);
}
// Re-open Sql Connection
sqlCon.Open();
sqlCon.ChangeDatabase(stagingDBName);
foreach (String filePath in fileNames)
{
using (SqlTransaction sqlTran = sqlCon.BeginTransaction())
{
using (FlatFileReader ffReader = new FlatFileReader(filePath, us.Delimiter))
{
using (SqlBulkCopy sqlBulkCopy =
new SqlBulkCopy(sqlCon, SqlBulkCopyOptions.Default, sqlTran))
{
SqlConnection sqlCon2 = new SqlConnection(sqlConnection);
SetColumnList(sqlCon2, us, sqlBulkCopy);
sqlBulkCopy.BatchSize = 1000;
sqlBulkCopy.DestinationTableName =
us.StagingSchemaName + "." + us.StagingTableName;
sqlBulkCopy.WriteToServer(ffReader);
sqlTran.Commit();
sqlCon2.Close();
}
}
}
}
sqlCon.ChangeDatabase(edwDBName);
sqlCon.Close();
sqlCon.Open();
SetRowCount(us, sqlCon);
sqlCon.Close();
us.EndJobTime = DateTime.Now;
sqlCon.Open();
LogStagingProcess(us, sqlCon);
sqlCon.Close();
Console.WriteLine(us.ProcessedRowCount + " rows inserted.");
Console.WriteLine("---------------------------------------------");
Console.WriteLine("Success! End Time: " + us.EndJobTime);
Console.WriteLine("---------------------------------------------");
Console.ReadLine();
}
catch (SqlException e)
{
RenderExceptionMessagesAndExit(e,
"Exception have occured during an attempt to utilize SqlBulkCopy\n");
}
}