diff --git a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java index 8434601e4..39f8c1627 100644 --- a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java +++ b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java @@ -751,18 +751,68 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi return str; } + public void doPostProcessing(Connection connection, List pkList) throws Exception + { + PreparedStatement ps; + String sql = ""; + try + { + for (final Iterator it = pkList.iterator(); it.hasNext();) + { + String pkValue = (String) it.next(); + if (mPollingPostProcessing.equalsIgnoreCase("CopyRow") || + mPollingPostProcessing.equalsIgnoreCase("MoveRow")) + { + sql = "INSERT INTO "+mMoveRowToTableName+" SELECT * FROM "+mTableName+" WHERE "+mPKName+"=?"; + mLogger.log(Level.INFO, "Executing sql 4. " + sql); + ps = connection.prepareStatement(sql); + JDBCUtil.bindParams(ps, pkValue); + final int count = ps.executeUpdate(); + mLogger.log(Level.FINE, "Inserted records: " + count); + ps.close(); + } + if (mPollingPostProcessing.equalsIgnoreCase("Delete") || + mPollingPostProcessing.equalsIgnoreCase("MoveRow")) + { + sql = "DELETE FROM "+mTableName+" WHERE "+mPKName+"=?"; + mLogger.log(Level.INFO, "Executing sql 2. " + sql); + ps = connection.prepareStatement(sql); + JDBCUtil.bindParams(ps, pkValue); + final int delcount = ps.executeUpdate(); + mLogger.log(Level.FINE, "Deleted records: " + delcount); + ps.close(); + } + if (mPollingPostProcessing.equalsIgnoreCase("MarkColumn") || + mPollingPostProcessing.equalsIgnoreCase("CopyRow")) + { + sql = "UPDATE "+mTableName+" SET "+mMarkColumnName+"=? WHERE "+mPKName+"=?"; + mLogger.log(Level.INFO, "Executing sql 3. " + sql); + ps = connection.prepareStatement(sql); + JDBCUtil.bindParams(ps, mMarkColumnValue, pkValue); + final int count = ps.executeUpdate(); + mLogger.log(Level.FINE, "Updated records: " + count); + ps.close(); + } + } + } + catch (final Exception se) + { + final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + sql; + mLogger.log(Level.SEVERE, msg, new Object[] { se.getLocalizedMessage() }); + throw se; + } + } + /** * @param exchange * @throws Exception */ //@Override public synchronized void processReplyMessage(final MessageExchange exchange) throws Exception { - String sql = null; String jndiName = null; Transaction tx = null; boolean isTransacted = exchange.isTransacted(); Connection connection = null; - PreparedStatement ps = null; if (!(exchange instanceof InOnly) && !(exchange instanceof InOut)) { mLogger.log(Level.SEVERE, "DBBC_E00647.IMP_Unsupported_exchange_pattern", @@ -773,11 +823,8 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi final String messageId = exchange.getExchangeId(); try { if (InboundMessageProcessor.mInboundExchanges.containsKey(messageId)) { - if (exchange.getStatus() == ExchangeStatus.DONE) { try { - jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME); - mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName); if (isTransacted && exchange instanceof InOnly) { tx = (Transaction) exchange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME); try { @@ -787,7 +834,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi // for in-only there's no sending of status back to nmr // failure will be logged mLogger.log(Level.WARNING, "DBBC_E00651.IMP_RESUME_FAILED", - new Object[] { ex.getLocalizedMessage() }); + new Object[] { ex.getLocalizedMessage() }); } try { if (tx.getStatus() == Status.STATUS_MARKED_ROLLBACK) { @@ -798,7 +845,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi // for in-only there's no sending of status back to nmr // failure will be logged mLogger.log(Level.WARNING, "DBBC_E00652.IMP_ROLLBACK_FAILED", - new Object[] { ex.getLocalizedMessage() }); + new Object[] { ex.getLocalizedMessage() }); } } } catch (Exception ex) { @@ -811,122 +858,10 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi new Object[] { ex.getLocalizedMessage() }); } } + jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME); + mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName); connection = getDatabaseConnection(jndiName); -// if (isTransacted && exchange instanceof InOnly) { - connection.setAutoCommit(true); -// } - //final List records = epb.getProcessList(); - final List records = (List)mMapInboundExchangesProcessRecords.get(messageId); - for (final Iterator it = records.iterator(); it.hasNext();) { - String pkNameRet = (String) it.next(); - String pkNameValue = pkNameRet; - if (mPKType.equalsIgnoreCase("LONGVARCHAR") || mPKType.equalsIgnoreCase("CHAR") - || mPKType.equalsIgnoreCase("VARCHAR")) { - pkNameRet = "'" + pkNameRet + "'"; - } - if (mPollingPostProcessing.equalsIgnoreCase("Delete")) { - sql = dbDataAccessObject.createQualifiedQuery(mTableName, mMoveRowToTableName, - mMarkColumnName, mMarkColumnValue, mPKName, "DELETE", mFlagColumnType); - sql = sql + "=" + pkNameRet; - mLogger.log(Level.INFO, "Executing sql 2. " + sql); - ps = connection.prepareStatement(sql); - - final int delcount = ps.executeUpdate(); - mLogger.log(Level.FINE, "Records deleted are:" + delcount); - try { - if (ps != null) { - ps.close(); - } - } catch (final SQLException se) { - if (mLogger.isLoggable(Level.FINEST)) { - mLogger.log(Level.FINEST, mMessages.getString("DBBC_E00628.OMP_Cleanup_Failure"), se); - } else if (mLogger.isLoggable(Level.INFO)) { - mLogger.info(mMessages.getString("DBBC_E00628.OMP_Cleanup_Failure")); - } - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00628.OMP_Cleanup_Failure"), se); - } - } else if (mPollingPostProcessing.equalsIgnoreCase("MarkColumn")) { - sql = dbDataAccessObject.createQualifiedQuery(mTableName, mMoveRowToTableName, - mMarkColumnName, mMarkColumnValue, mPKName, "UPDATE", mFlagColumnType); - sql = sql + "=" + pkNameRet; - mLogger.log(Level.INFO, "Executing sql 3. " + sql); - ps = connection.prepareStatement(sql); - - final int count = ps.executeUpdate(); - mLogger.log(Level.FINE, "Records updated are " + count); - try { - if (ps != null) { - ps.close(); - } - } catch (final SQLException se) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00628.OMP_Cleanup_Failure"), se); - } - } else if (mPollingPostProcessing.equalsIgnoreCase("CopyRow")) { - - sql = dbDataAccessObject.createQualifiedQuery(mTableName, mMoveRowToTableName, - mMarkColumnName, mMarkColumnValue, mPKName, "INSERT", mFlagColumnType); - sql = sql + "=" + pkNameRet; - mLogger.log(Level.INFO, "Executing sql 4. " + sql); - ps = connection.prepareStatement(sql); - - final int count = ps.executeUpdate(); - mLogger.log(Level.FINE, "Records updated are " + count); - try { - if (ps != null) { - ps.close(); - } - } catch (final SQLException se) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00628.OMP_Cleanup_Failure"), se); - } - sql = dbDataAccessObject.createQualifiedQuery(mTableName, mMoveRowToTableName, - mMarkColumnName, mMarkColumnValue, mPKName, "UPDATE", mFlagColumnType); - sql = sql + "=" + pkNameRet; - mLogger.log(Level.INFO, "Executing sql 5. " + sql); - ps = connection.prepareStatement(sql); - - final int updatecount = ps.executeUpdate(); - mLogger.log(Level.FINE, "Records updated are " + updatecount); - try { - if (ps != null) { - ps.close(); - } - } catch (final SQLException se) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00628.OMP_Cleanup_Failure"), se); - } - } else if (mPollingPostProcessing.equalsIgnoreCase("MoveRow")) { - sql = dbDataAccessObject.createQualifiedQuery(mTableName, mMoveRowToTableName, - mMarkColumnName, mMarkColumnValue, mPKName, "INSERT", mFlagColumnType); - sql = sql + "=" + pkNameRet; - mLogger.log(Level.INFO, "Executing sql 6. " + sql); - ps = connection.prepareStatement(sql); - - final int count = ps.executeUpdate(); - mLogger.log(Level.FINE, "Records updated are " + count); - try { - if (ps != null) { - ps.close(); - } - } catch (final SQLException se) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00628.OMP_Cleanup_Failure"), se); - } - sql = dbDataAccessObject.createQualifiedQuery(mTableName, mMoveRowToTableName, - mMarkColumnName, mMarkColumnValue, mPKName, "DELETE", mFlagColumnType); - sql = sql + "=" + pkNameRet; - mLogger.log(Level.INFO, "Executing sql 7. " + sql); - ps = connection.prepareStatement(sql); - - final int delcount = ps.executeUpdate(); - mLogger.log(Level.FINE, "Records deleted are:" + delcount); - try { - if (ps != null) { - ps.close(); - } - } catch (final SQLException se) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00628.OMP_Cleanup_Failure"), se); - } - } // else if - mProcessedList.remove(pkNameValue); - } + doPostProcessing(connection, (List)mMapInboundExchangesProcessRecords.get(messageId)); if (isTransacted && exchange instanceof InOnly) { try { // As we are the initiator for tx we have to commit @@ -938,22 +873,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi new Object[] { ex.getLocalizedMessage() }); } } - - } catch (final SQLException ex) { - final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") - + sql; - mLogger.log(Level.SEVERE, msg, new Object[] {ex.getLocalizedMessage()}); - if (isTransacted && exchange instanceof InOnly) { - try { - // As we are the initiator for tx we have to rollback - rollbackThreadTx(exchange); - } catch (Exception exception) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00653.IMP_XA_TX_ROLLBACK_FAILED"), exception); - } - } } catch (final Exception ex) { - final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL"); - mLogger.log(Level.SEVERE, msg, new Object[] {ex.getLocalizedMessage()}); if (isTransacted && exchange instanceof InOnly) { try { // As we are the initiator for tx we have to rollback @@ -991,11 +911,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi } } } else { - final List records = (List)mMapInboundExchangesProcessRecords.get(messageId); - for (final Iterator it = records.iterator(); it.hasNext();) { - String pkNameRet = (String) it.next(); - mProcessedList.remove(pkNameRet); - } mLogger.log(Level.SEVERE, "IMP_MXCH_BAD_STATUS", new Object[] { exchange.getStatus().toString(), messageId }); if (isTransacted && exchange instanceof InOnly) { try { @@ -1011,7 +926,11 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi throw new Exception(msgError); } } - + final List records = (List)mMapInboundExchangesProcessRecords.get(messageId); + for (final Iterator it = records.iterator(); it.hasNext();) { + String pkNameRet = (String)it.next(); + mProcessedList.remove(pkNameRet); + } if (mLogger.isLoggable(Level.INFO)) { mLogger.log(Level.INFO, "DBBC_E00648.IMP_Remove_exchange_msg_id", messageId); } @@ -1021,13 +940,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi } finally { InboundMessageProcessor.mInboundExchanges.remove(messageId); mMapInboundExchangesProcessRecords.remove(messageId); - try { - if(ps != null) { - ps.close(); - } - }catch(SQLException se){ - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11110.IMP_EXCEPTION_WHILE_CLOSING_THE_PS"), se); - } try{ if(connection != null) { connection.close(); diff --git a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCUtil.java b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCUtil.java index a7ef9c261..9b10de903 100644 --- a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCUtil.java +++ b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCUtil.java @@ -47,6 +47,8 @@ import javax.xml.datatype.DatatypeFactory; import javax.xml.datatype.XMLGregorianCalendar; import javax.sql.rowset.serial.SerialBlob; import javax.sql.rowset.serial.SerialClob; +import java.sql.PreparedStatement; +import java.sql.ParameterMetaData; import java.sql.ResultSet; import java.sql.ResultSetMetaData; @@ -386,4 +388,15 @@ public class JDBCUtil { JDBCOperations jdbcOps = JDBCOperations.getJDBCOperations(opName); return jdbcOps.toString();*/ } + + public static void bindParams(PreparedStatement ps, String... params) throws Exception + { + ParameterMetaData meta = ps.getParameterMetaData(); + for (int i = 0; i < params.length; i++) + { + int columnType = java.sql.Types.VARCHAR; + try { columnType = meta.getParameterType(i+1); } catch(Exception e) {} + ps.setObject(i+1, JDBCUtil.convert(params[i], columnType), columnType); + } + } }