我有两个线程,这两个个线程A、B互斥 且 都是批处理形式,所以如果某个线程还没执行完所有数据,那么下一次执行的时候,还是得进入此线程中。并且当有一个接口被调用的时候,如果A,B线程此时都没有数据处理的话,插入数据并且指定B线程执行。
我采用的是两种变量互斥,但是还是会存在相关线程安全问题。所以有没有更好的方法去处理了。
相关代码
线程A:
public class AaHistoryTask extends TimerTask {
@Override
public void run() {
try {
//丑陋的状态控制,如果B线程在数据处理中,退出。
if(AsStatusUtil.getAsSettleRepay()) {
return;
}
boolean hasIdleThreads = false;
List<FactoringsDbCore.AaHistory.Builder> aahs = Db2Pb.toAaHistory(
QueryMethods.queryAaHistory(BATCH_SIZE, null));
if (aahs.size() > 0) {
if (!AsStatusUtil.setAsAah(true)) {
return;
}
logger.info("AaHistoryTask inPutAaHistorySize: " + aahs.size());
} else {
//修改aah状态为false
AsStatusUtil.setAsAah(false);
return;
}
for (FactoringsDbCore.AaHistory.Builder aah : aahs) {
SUB_AA_HISTORY_TASKS[(int) (Long.parseLong(aah.getAccountId()/*.substring(0,17)*/) % SUB_TASK_COUNT)].addNewAah(aah);
}
} catch (Exception e) {
logger.error("", e);
}
}
}
线程B:
public class SettleKeeperTask extends TimerTask {
@Override
public void run() {
try {
//丑陋的状态控制实现,A线程处理数据中,退出
if (AsStatusUtil.getAsAah()) {
return;
}
List<FactoringsDbCore.AccountSum.Builder> ass = Db2Pb.toAccountSum(
QueryMethods.queryAccountSumToBeSettle(BATCH_SIZE, null));
if (ass.size() > 0) {
if(!AsStatusUtil.setAsSettleRepay(true)) {
return;
}
logger.info("SettleKeeperTask settle inPutAccountSumSize: " + ass.size());
//开始
SettleRepayKeeper.settle(ass);
} else {
//开始
List<FactoringsDbCore.Repay.Builder> repayList = Db2Pb.toRepay(
QueryMethods.queryRepay(FactoringsDbBase.RepayStatus.RS_REPAYING, 1, null));
if(repayList.size() > 0) {
if(!AsStatusUtil.setAsSettleRepay(true)) {
return;
}
//全部处理完之后,状态置为false
SettleRepayKeeper.repay(repayList.get(0), BATCH_SIZE);
} else {
AsStatusUtil.setAsSettleRepay(false);
}
}
} catch (Exception e) {
logger.error("", e);
}
}
}
AsStatusUtil类是:
public class AsStatusUtil {
//B线程处理状态,处理中为true,处理完为false
private static boolean asSettleRepay = false;
//A线程处理状态,true表示执行中,false表示未开始或者已完成[默认AAH先处理]
private static boolean asAah = true;
public static synchronized boolean getAsSettleRepay() {
return asSettleRepay;
}
public static synchronized boolean getAsAah() {
return asAah;
}
public static synchronized boolean setAsSettleRepay(boolean flag) {
//如果A正在执行,返回false,无法切换状态
if(asAah) {
return false;
}
asSettleRepay = flag;
return true;
}
public static synchronized boolean setAsAah(boolean flag) {
//如果B正在执行,返回false,无法切换状态
if(asSettleRepay) {
return false;
}
asAah = flag;
return true;
}
}