程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 數據庫知識 >> Oracle數據庫 >> Oracle教程 >> [Oracle] Bulk Insert Data,oraclebulk

[Oracle] Bulk Insert Data,oraclebulk

編輯:Oracle教程

[Oracle] Bulk Insert Data,oraclebulk


命名空間:Oracle.DataAccess.Client

組件:Oracle.DataAccess.dll(2.112.1.0)

ODP.NET 版本:ODP.NET for .NET Framework 2.0 或 ODP.NET for .NET Framework 4

工具:Microsoft Visual Studio Ultimate 2013 + Oracle SQL Developer 1.5.5 + Oracle Database 11g Enterprise Edition 11.2.0.1.0(32位) + TNS for 32-bit Windows 11.2.0.1.0

 

方式一:ArrayBind

當插入一條數據時,SQL語句如下:

INSERT INTO table_name VALUES (:col1, :col2, :col3, :col4, :col5)
  1 public void InsertDataRow(Dictionary<string, object> dataRow)
  2 {
  3 	StringBuilder sbCmdText = new StringBuilder();
  4 	sbCmdText.AppendFormat("INSERT INTO {0}(", m_TableName);
  5 	sbCmdText.Append(string.Join(",", dataRow.Keys.ToArray()));
  6 	sbCmdText.Append(") VALUES (");
  7 	sbCmdText.Append(":" + string.Join(",:", dataRow.Keys.ToArray()));
  8 	sbCmdText.Append(")");
  9 
 10 	using (OracleConnection conn = new OracleConnection())
 11 	{
 12 		using (OracleCommand cmd = conn.CreateCommand())
 13 		{
 14 			cmd.CommandType = CommandType.Text;
 15 			cmd.CommandText = sbCmdText.ToString();
 16 			OracleParameter parameter = null;
 17 			OracleDbType dbType = OracleDbType.Object;
 18 			foreach (string colName in dataRow.Keys)
 19 			{
 20 				dbType = GetOracleDbType(dataRow[colName]);
 21 				parameter = new OracleParameter(colName, dbType);
 22 				parameter.Direction = ParameterDirection.Input;
 23 				parameter.OracleDbTypeEx = dbType;
 24 				parameter.Value = dataRow[colName];
 25 				cmd.Parameters.Add(parameter);
 26 			}
 27 			conn.Open();
 28 			int result = cmd.ExecuteNonQuery();
 29 		}
 30 	}
 31 }

此時,每一個 OracleParameter 的 Value 值都賦予單個字段的 一個具體值,這種也是最為傳統的插入數據的方法。

Oracle V6 中 OCI 編程接口加入了數組接口特性。

當采用 ArrayBind 時,OraleParameter 的 Value 值則是賦予單個字段的 一個數組,即多條數據的該字段組合成的一個數組。此時 Oracle 僅需要執行一次 SQL 語句,即可在內存中批量解析並導入數據,減少程序與數據庫之間來回的操作,其優點就是數據導入的總體時間明顯減少,尤其是進程占用CPU的時間。

如果數據源是 DataTable 類型,首先把 DataTable 數據源,轉換成 object[][] 類型,然後綁定 OracleParameter 的 Value 值為對應字段的一個 Object[] 數組即可;參考代碼如下:

  1 /// <summary>
  2 /// 批量插入大數據量
  3 /// </summary>
  4 /// <param name="columnData">列名-列數據字典</param>
  5 /// <param name="dataCount">數據量</param>
  6 /// <returns>插入數據量</returns>
  7 public int InsertBigData(Dictionary<string, object> columnData, int dataCount)
  8 {
  9 	int result = 0;
 10 	if (columnData == null || columnData.Count < 1)
 11 	{
 12 		return result;
 13 	}
 14 	string[] colHeaders = columnData.Keys.ToArray();
 15 	StringBuilder sbCmdText = new StringBuilder();
 16 	if (columnData.Count > 0)
 17 	{
 18 		// 拼接INSERT的SQL語句
 19 		sbCmdText.AppendFormat("INSERT INTO {0}(", m_TableName);
 20 		sbCmdText.Append(string.Join(",", colHeaders));
 21 		sbCmdText.Append(") VALUES (");
 22 		sbCmdText.Append(m_ParameterPrefix + string.Join("," + m_ParameterPrefix, colHeaders));
 23 		sbCmdText.Append(")");
 24 		OracleConnection connection = null;
 25 		try
 26 		{
 27 			connection = new OracleConnection(GetConnectionString());
 28 			using (OracleCommand command = connection.CreateCommand())
 29 			{
 30 				command.ArrayBindCount = dataCount;
 31 				command.BindByName = true;
 32 				command.CommandType = CommandType.Text;
 33 				command.CommandText = sbCmdText.ToString();
 34 				command.CommandTimeout = 1800;
 35 				OracleParameter parameter;
 36 				OracleDbType dbType = OracleDbType.Object;
 37 				foreach (string colName in colHeaders)
 38 				{
 39 					dbType = GetOracleDbType(columnData[colName]);
 40 					parameter = new OracleParameter(colName, dbType);
 41 					parameter.Direction = ParameterDirection.Input;
 42 					parameter.OracleDbTypeEx = dbType;
 43 					parameter.Value = columnData[colName];
 44 					command.Parameters.Add(parameter);
 45 				}
 46 				connection.Open();
 47 				OracleTransaction trans = connection.BeginTransaction();
 48 				try
 49 				{
 50 					command.Transaction = trans;
 51 					result = command.ExecuteNonQuery();
 52 					trans.Commit();
 53 				}
 54 				catch (Exception ex)
 55 				{
 56 					trans.Rollback();
 57 					throw ex;
 58 				}
 59 			}
 60 		}
 61 		finally
 62 		{
 63 			if (connection != null)
 64 			{
 65 				connection.Close();
 66 				connection.Dispose();
 67 			}
 68 			GC.Collect();
 69 			GC.WaitForFullGCComplete();
 70 		}
 71 	}
 72 	return result;
 73 }
  1 /// <summary>
  2 /// 根據數據類型獲取OracleDbType
  3 /// </summary>
  4 /// <param name="value">數據</param>
  5 /// <returns>數據的Oracle類型</returns>
  6 private static OracleDbType GetOracleDbType(object value)
  7 {
  8 	OracleDbType dataType = OracleDbType.Object;
  9 	if (value is string[])
 10 	{
 11 		dataType = OracleDbType.Varchar2;
 12 	}
 13 	else if (value is DateTime[])
 14 	{
 15 		dataType = OracleDbType.TimeStamp;
 16 	}
 17 	else if (value is int[] || value is short[])
 18 	{
 19 		dataType = OracleDbType.Int32;
 20 	}
 21 	else if (value is long[])
 22 	{
 23 		dataType = OracleDbType.Int64;
 24 	}
 25 	else if (value is decimal[] || value is double[] || value is float[])
 26 	{
 27 		dataType = OracleDbType.Decimal;
 28 	}
 29 	else if (value is Guid[])
 30 	{
 31 		dataType = OracleDbType.Varchar2;
 32 	}
 33 	else if (value is bool[] || value is Boolean[])
 34 	{
 35 		dataType = OracleDbType.Byte;
 36 	}
 37 	else if (value is byte[])
 38 	{
 39 		dataType = OracleDbType.Blob;
 40 	}
 41 	else if (value is char[])
 42 	{
 43 		dataType = OracleDbType.Char;
 44 	}
 45 	return dataType;
 46 }
GetOracleDbType說明:如果采用分次(每次1萬數據)執行 InsertBigData 方法,速度反而比一次性執行 InsertBigData 方法慢,詳見下面測試結果;

測試結果:

無索引,數據類型:4列NVARCHAR2,2列NUMBER

30+萬(7.36M):一次性導入用時 15:623,每次10000導入用時 

60+萬(14.6M):一次性導入用時 28:207,每次10000導入用時 1:2:300

100+萬(24.9M):一次性導入報如下異常

此時實際上從資源監視器上可以得知仍有可用內存,但是仍舊報 OutOfMemoryException,所以猜測應該是一個 bug;

如果每次10000導入用時 2:9:252

如果每次50000導入用時 58:101

附加 InsertBigData 方法使用示例:

  1 // 每10000數據導入一次
  2 Dictionary<string, object> columnsData = new Dictionary<string, object>();
  3 int dataCount = m_SourceDataTable.Rows.Count;
  4 int times = dataCount / 10000 + (dataCount % 10000 == 0 ? 0 : 1);
  5 for (int i = 0; i < times; i++)
  6 {
  7 	int startIndex = i * 10000;
  8 	int endIndex = (i + 1) * 10000;
  9 	endIndex = endIndex > dataCount ? dataCount : endIndex;
 10 	int currDataCount = endIndex - startIndex;
 11 	columnsData.Add("COL1", new string[currDataCount]);
 12 	columnsData.Add("COL2", new string[currDataCount]);
 13 	columnsData.Add("COL3", new decimal[currDataCount]);
 14 	columnsData.Add("COL4", new string[currDataCount]);
 15 	columnsData.Add("COL5", new decimal[currDataCount]);
 16 	columnsData.Add("COL6", new string[currDataCount]);
 17 	for (int rowIndex = startIndex; rowIndex < endIndex; rowIndex++)
 18 	{
 19 		int dicRowIndex = rowIndex - startIndex;// 列數據行索引
 20 		foreach (string colName in columnsData.Keys)
 21 		{
 22 			object cell = m_SourceDataTable.Rows[rowIndex][colName];
 23 			string cellStr = (cell + "").TrimEnd(new char[] { '\0', ' ' });
 24 			if (colName == "COL3" || colName == "COL5")
 25 			{
 26 				decimal value = 0;
 27 				decimal.TryParse(cellStr, out value);
 28 				((decimal[])columnsData[colName])[dicRowIndex] = value;
 29 			}
 30 			else
 31 			{
 32 				((string[])columnsData[colName])[dicRowIndex] = cellStr;
 33 			}
 34 		}
 35 	}
 36 	m_DAL.InsertBigData(columnsData, currDataCount);
 37 
 38 	columnsData.Clear();
 39 	GC.Collect();
 40 	GC.WaitForFullGCComplete();
 41 }
View Code

方式二:OracleBulkCopy

說明:

1. OracleBulkCopy 采用 direct path 方式導入;

2. 不支持 transaction,無法 Rollback;

3. 如果該表存在觸發器時,無法使用 OracleBulkCopy(報異常信息 Oracle Error: ORA-26086),除非先禁用該表的所有觸發器;

4. 過程中會自動啟用 NOT NULL、UNIQUE 和 PRIMARY KEY 三種約束,其中 NOT NULL 約束在列數組綁定時驗證,任何違反 NOT NULL 約束條件的行數據都會捨棄;UNIQUE 約束是在導入完成後重建索引時驗證,但是在 bulk copy 時,允許違反索引約束,並在完成後將索引設置成禁用(UNUSABLE)狀態;而且,如果索引一開始狀態就是禁用(UNUSABLE)狀態時,OracleBulkCopy 是會報錯的。

參考代碼如下:

  1 /// <summary>
  2 /// 批量插入數據
  3 /// 該方法需要禁用該表所有觸發器,並且插入的數據如果為空,是不會采用默認值
  4 /// </summary>
  5 /// <param name="table">數據表</param>
  6 /// <param name="targetTableName">數據庫目標表名</param>
  7 /// <returns></returns>
  8 public bool InsertBulkData(DataTable table, string targetTableName)
  9 {
 10 	bool result = false;
 11 	string connStr = GetConnectionString();
 12 	using (OracleConnection connection = new OracleConnection(connStr))
 13 	{
 14 		using (OracleBulkCopy bulkCopy = new OracleBulkCopy(connStr, OracleBulkCopyOptions.Default))
 15 		{
 16 			if (table != null && table.Rows.Count > 0)
 17 			{
 18 				bulkCopy.DestinationTableName = targetTableName;
 19 				for (int i = 0; i < table.Columns.Count; i++)
 20 				{
 21 					string col = table.Columns[i].ColumnName;
 22 					bulkCopy.ColumnMappings.Add(col, col);
 23 				}
 24 				connection.Open();
 25 				bulkCopy.WriteToServer(table);
 26 				result = true;
 27 			}
 28 			bulkCopy.Close();
 29 			bulkCopy.Dispose();
 30 		}
 31 	}
 32 
 33 	return result;
 34 }

測試結果:

數據類型:4列NVARCHAR2,2列NUMBER

30+萬(7.36M):用時 14:590

60+萬(14.6M):用時 28:28

1048576(24.9M):用時 52:971

附加,禁用表的所有外鍵SQL:

ALTER TABLE table_name DISABLE ALL TRIGGERS

總結

1、在30+萬和60+萬數據時,ArrayBind一次性導入和OracleBulkCopy時間相差不是很大,但是ArrayBind方式一般都需要轉換數據形式,占用了一些時間,而 OracleBulkCopy 則只需要簡單處理一下 DataTable 數據源即可導入;

2、當數據量達到100+萬時,ArrayBind很容易出現內存不足異常,此時只能采用分批次執行導入,根據測試結果可知,次數越少,速度越快;而采用 OracleBulkCopy 方式則很少出現內存不足現象,由此可見 OracleBulkCopy 占用內存比 ArrayBind 方式少;

參考資料:

1、ArrayBind http://www.oracle.com/technetwork/issue-archive/2009/09-sep/o59odpnet-085168.html

2、ArrayBind http://www.soaspx.com/dotnet/csharp/csharp_20130911_10501.html

3、Oracle數據導入方法 http://dbanotes.net/Oracle/All_About_Oracle_Data_Loading.htm

4、介紹OracleBulkCopy類 https://docs.oracle.com/cd/E11882_01/win.112/e23174/OracleBulkCopyClass.htm#ODPNT7446

5、http://dba.stackexchange.com/questions/7287/what-specifically-does-oraclebulkcopy-do-and-how-can-i-optimize-its-performance

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved