`

JMS MDB 与 XA 事务-001

 
阅读更多
什么是JTA

   Java事务API(JTA;Java Transaction API)和它的同胞Java事务服务JTS(Java Transaction Service),为J2EE平台提供了分布式事务服务。

   一个分布式事务(distributed transaction)包括一个事务管理器(transaction manager)和一个或多个资源管理器(resource manager)。一个资源管理器(resource manager)是任意类型的持久化数据存储。事务管理器(transaction manager)承担着所有事务参与单元者的相互通讯的责任。下图显示了事务管理器和资源管理的间的关系。

  JTA事务比JDBC事务更强大。一个JTA事务可以有多个参与者,而一个JDBC事务则被限定在一个单一的数据库连接。下列任一个Java平台的组件都可以参与到一个JTA事务中:
  .JDBC连接
  .JDO PersistenceManager 对象
  .JMS 队列
  .JMS 主题
  .企业JavaBeans(EJB)
  .一个用J2EE Connector Architecture 规范编译的资源分配器。
使用JTA的事务划分
  要用JTA来划分一个事务,应用程序调用javax.transaction.UserTransaction接口中的方法。示例4显示了一个典型的JNDI搜索的UseTransaction对象。
  import javax.transaction.*;
  import javax.naming.*;
  // ...
  InitialContext ctx = new InitialContext();
  Object txObj = ctx.lookup(";java:comp/UserTransaction";);
  UserTransaction utx = (UserTransaction) txObj; 
  应用程序有了UserTransaction对象的引用之后,就可以象示例5那样来起动事务。
   utx.begin();
  // ...
  DataSource ds = obtainXADataSource();
  Connection conn = ds.getConnection();
  pstmt = conn.prepareStatement(";UPDATE MOVIES ...";);
  pstmt.setString(1, ";Spinal Tap";);
  pstmt.executeUpdate();
  // ...
  utx.commit();
  // ...
 当应用程序调用commit()时,事务管理器使用两段提交协议来结束事务。JTA事务控制的方法:
  .javax.transaction.UserTransaction接口提供
使用JTA和JDBC
  应用程序调用begin()来起动事务,即可调用commit()也可以调用rollback()来结束事务。
  开发人员经常使用JDBC来作为DAO类中的底层数据操作。如果计划使用JTA来划分事务,你将需要一个实现了javax.sql.XADataSource,javax.sql.XAConnection和javax.sql.XAResource接口JDBC的驱动。实现了这些接口的驱动将有能力参与到JTA事务中。一个XADataSource对象是一个XAConnection对象的工厂。XAConnections是参与到JTA事务中的连接。
  你需要使用应用程序服务器管理工具来建立XADataSource对象。对于特殊的指令请参考应用程序服务器文档和JDBC驱动文档。
  J2EE应用程序使用JNDI来查找数据源。一旦应用程序有了一个数据源对象的引用,这会调用javax.sql.DataSource.getConnection()来获得数据库的连接。
  XA连接区别于非XA连接。要记住的是XA连接是一个JTA事务中的参与者。这就意味着XA连接不支持JDBC的自动提交特性。也就是说应用程序不必在XA连接上调用java.sql.Connection.commit()或java.sql.Connection.rollback()。相反,应用程序应该使用UserTransaction.begin()、UserTransaction.commit()和UserTransaction.rollback().
  选择最好的方法
  我们已经讨论了JDBC和JTA是怎样划分事务的。每一种方法都有它的优点,因此你需要决定为你的应用程序选择一个最适应的方法。在我们团队许多最近的对于事务划分的项目中使用JDBC API来创建DAO类。这DAO类总结如下:
  .事务划分代码被嵌入到DAO类内部
  .DAO类使用JDBC API来进行事务划分
  .调用者没有划分事务的方法
  .事务范围被限定在一个单一的JDBC连接
  JDBC事务对复杂的企业应用程序不总是有效的。如果你的事务将跨越多个DAO对象或多个数据库,那么下面的实现策略可能会更恰当:
  .用JTA对事务进行划分
  .事务划分代码被DAO分开
  .调用者承担划分事务的责任
  .DAO参与一个全局的事务中
  JDBC方法由于它的简易性而具有吸引力,JTA方法提供了更多灵活性。你选择什么样的实现将依赖于你的应用程序的特定需求。
  JTA(Java Transaction API) 为 J2EE 平台提供了分布式事务服务。
  要用 JTA 进行事务界定,应用程序要调用 javax.transaction.UserTransaction 接口中的方法。例如:
  utx.begin();
  // ...
  DataSource ds = obtainXADataSource();
  Connection conn = ds.getConnection();
  pstmt = conn.prepareStatement("UPDATE MOVIES ...");
  pstmt.setString(1, "Spinal Tap");
  pstmt.executeUpdate();
  // ...
  utx.commit();
  让我们来关注下面的话:
  “用 JTA 界定事务,那么就需要有一个实现 javax.sql.XADataSource 、 javax.sql.XAConnection 和 javax.sql.XAResource 接口的 JDBC 驱动程序。一个实现了这些接口的驱动程序将可以参与 JTA 事务。一个 XADataSource 对象就是一个 XAConnection 对象的工厂。 XAConnection s 是参与 JTA 事务的 JDBC 连接。”
  要使用JTA事务,必须使用XADataSource来产生数据库连接,产生的连接为一个XA连接。
  XA连接(javax.sql.XAConnection)和非XA(java.sql.Connection)连接的区别在于:XA可以参与JTA的事务,而且不支持自动提交。
  Note:
  Oracle, Sybase, DB2, SQL Server等大型数据库才支持XA, 支持分布事务。
  My SQL 连本地都支持不好,更别说分布事务了。
  MySql 在5.0的版本后增加了对xa的支持
JTA方式的实现过程
  用XADataSource产生的XAConnection它扩展了一个getXAResource()方法,事务通过这个方法把它加入到事务容器中进行管理.对于调用者来说,根本看不到事务是如何管理的,你只要声明开始事务,告诉容器我下面的操作要求事务参与了,最后告诉事务说到这儿可以提交或回滚了,别的都是暗箱操作。
  在使用JTA之前,你必须首先实现一个Xid类用来标识事务(在普通情况下这将由事务管理程序来处理)。Xid包含三个元素:formatID、gtrid(全局事务标识符)和bqual(分支修饰词标识符)。
  下面的例子说明Xid的实现:
  import javax.transaction.xa.*;
  public class MyXid implements Xid
  {
  protected int formatId;
  protected byte gtrid[];
  protected byte bqual[];
  public MyXid()
  {
  }
  public MyXid(int formatId, byte gtrid[], byte bqual[])
  {
  this.formatId = formatId;
  this.gtrid = gtrid;
  this.bqual = bqual;
  }
  public int getFormatId()
  {
  return formatId;
  }
  public byte[] getBranchQualifier()
  {
  return bqual;
  }
  public byte[] getGlobalTransactionId()
  {
  return gtrid;
  }
  }
  其次,你需要创建一个你要使用的数据库的数据源:
  public DataSource getDataSource()
  throws SQLException
  {
  SQLServerDataSource xaDS = new
  com.merant.datadirect.jdbcx.sqlserver.SQLServerDataSource();
  xaDS.setDataSourceName("SQLServer");
  xaDS.setServerName("server");
  xaDS.setPortNumber(1433);
  xaDS.setSelectMethod("cursor");
  return xaDS;
  }
  例1?这个例子是用“两步提交协议”来提交一个事务分支:
  XADataSource xaDS;
  XAConnection xaCon;
  XAResource xaRes;
  Xid xid;
  Connection con;
  Statement stmt;
  int ret;
  xaDS = getDataSource();
  xaCon = xaDS.getXAConnection("jdbc_user", "jdbc_password");
  xaRes = xaCon.getXAResource();
  con = xaCon.getConnection();
  stmt = con.createStatement();
  xid = new MyXid(100, new byte[], new byte[]);
  try {
  xaRes.start(xid, XAResource.TMNOFLAGS);
  stmt.executeUpdate("insert into test_table values (100)");
  xaRes.end(xid, XAResource.TMSUCCESS);
  ret = xaRes.prepare(xid);
  if (ret == XAResource.XA_OK) {
  xaRes.commit(xid, false);
  }
  }
  catch (XAException e) {
  e.printStackTrace();
  }
  finally {
  stmt.close();
  con.close();
  xaCon.close();
  }
  当然,实际过程中,我们不需要写这些代码,这些代码是JTA最终的实现代码。

例2?这个例子,与例1相似,说明了一个返回过程:

xaRes.start(xid, XAResource.TMNOFLAGS);
stmt.executeUpdate("insert into test_table values (100)");
xaRes.end(xid, XAResource.TMSUCCESS);
ret = xaRes.prepare(xid);
if (ret == XAResource.XA_OK) {
 xaRes.rollback(xid);
}

  例3?这个例子说明一个分布式事务分支如何中止,让相同的连接做本地事务处理,以及它们稍后该如何继续这个分支。 分布式事务的两步提交作用不影响本地事务。

xid = new MyXid(100, new byte[]{0x01}, new byte[]{0x02});
xaRes.start(xid, XAResource.TMNOFLAGS);
stmt.executeUpdate("insert into test_table values (100)");
xaRes.end(xid, XAResource.TMSUSPEND);
∥这个更新在事务范围之外完成,所以它不受XA返回影响。

stmt.executeUpdate("insert into test_table2 values (111)");
xaRes.start(xid, XAResource.TMRESUME);
stmt.executeUpdate("insert into test_table values (200)");
xaRes.end(xid, XAResource.TMSUCCESS);
ret = xaRes.prepare(xid);

if (ret == XAResource.XA_OK) {

xaRes.rollback(xid);

}

  例4?这个例子说明一个XA资源如何分担不同的事务。 创建了两个事务分支,但是它们不属于相同的分布式事务。 JTA允许XA资源在第一个分支上做一个两步提交,虽然这个资源仍然与第二个分支相关联。

xid1 = new MyXid(100, new byte[]{0x01}, new byte[]{0x02});
xid2 = new MyXid(100, new byte[]{0x11}, new byte[]{0x22});
xaRes.start(xid1, XAResource.TMNOFLAGS);
stmt.executeUpdate("insert into test_table1 values (100)");
xaRes.end(xid1, XAResource.TMSUCCESS);
xaRes.start(xid2, XAResource.TMNOFLAGS);
ret = xaRes.prepare(xid1);
if (ret == XAResource.XA_OK) {
 xaRes.commit(xid2, false);
}
stmt.executeUpdate("insert into test_table2 values (200)");
xaRes.end(xid2, XAResource.TMSUCCESS);
ret = xaRes.prepare(xid2);
if (ret == XAResource.XA_OK) {
 xaRes.rollback(xid2);
}

  例5?这个例子说明不同的连接上的事务分支如何连接成为一个单独的分支,如果它们连接到相同的资源管理程序。这个特点改善了分布式事务的效率,因为它减少了两步提交处理的数目。两个连接到数据库服务器上的XA将被创建。每个连接创建它自己的XA资源,正规的JDBC连接和语句。在第二个XA资源开始一个事务分支之前,它将察看是否使用和第一个XA资源使用的是同一个资源管理程序。如果这是实例,它将加入在第一个XA连接上创建的第一个分支,而不是创建一个新的分支。 稍后,这个事务分支使用XA资源来准备和提交。

xaDS = getDataSource();
xaCon1 = xaDS.getXAConnection("jdbc_user", "jdbc_password");
xaRes1 = xaCon1.getXAResource();
con1 = xaCon1.getConnection();
stmt1 = con1.createStatement();

xid1 = new MyXid(100, new byte[]{0x01}, new byte[]{0x02});
xaRes1.start(xid1, XAResource.TMNOFLAGS);
stmt1.executeUpdate("insert into test_table1 values (100)");
xaRes1.end(xid, XAResource.TMSUCCESS);
xaCon2 = xaDS.getXAConnection("jdbc_user", "jdbc_password");
xaRes2 = xaCon1.getXAResource();
con2 = xaCon1.getConnection();
stmt2 = con1.createStatement();

if (xaRes2.isSameRM(xaRes1)) {
 xaRes2.start(xid1, XAResource.TMJOIN);
 stmt2.executeUpdate("insert into test_table2 values (100)");
 xaRes2.end(xid1, XAResource.TMSUCCESS);
}
else {
 xid2 = new MyXid(100, new byte[]{0x01}, new byte[]{0x03});
 xaRes2.start(xid2, XAResource.TMNOFLAGS);
 stmt2.executeUpdate("insert into test_table2 values (100)");
 xaRes2.end(xid2, XAResource.TMSUCCESS);
 ret = xaRes2.prepare(xid2);
 if (ret == XAResource.XA_OK) {
  xaRes2.commit(xid2, false);
 }
}
ret = xaRes1.prepare(xid1);
if (ret == XAResource.XA_OK) {
 xaRes1.commit(xid1, false);
}


  例6?这个例子说明在错误恢复的阶段,如何恢复准备好的或者快要完成的事务分支。 它首先试图返回每个分支;如果它失败了,它尝试着让资源管理程序丢掉关于事务的消息。

MyXid[] xids;
xids = xaRes.recover(XAResource.TMSTARTRSCAN | XAResource.TMENDRSCAN);
for (int i=0; xids!=null && i  try {
  xaRes.rollback(xids[i]);
 }
 catch (XAException ex) {
  try {
   xaRes.forget(xids[i]);
  }
 catch (XAException ex1) {
  System.out.println("rollback/forget failed: " + ex1.errorCode);
 }
}

}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics