diff --git a/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java b/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java index da6c7c36..6b829231 100644 --- a/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java +++ b/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java @@ -5,7 +5,13 @@ import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; + +import bftsmart.consensus.app.BlockResultImpl; import bftsmart.tom.*; +import com.jd.blockchain.binaryproto.BinaryProtocol; +import com.jd.blockchain.consensus.service.*; +import com.jd.blockchain.ledger.*; +import com.jd.blockchain.transaction.TxResponseMessage; import com.jd.blockchain.utils.serialize.binary.BinarySerializeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,18 +21,11 @@ import com.jd.blockchain.consensus.bftsmart.BftsmartConsensusProvider; import com.jd.blockchain.consensus.bftsmart.BftsmartConsensusSettings; import com.jd.blockchain.consensus.bftsmart.BftsmartNodeSettings; import com.jd.blockchain.consensus.bftsmart.BftsmartTopology; -import com.jd.blockchain.consensus.service.MessageHandle; -import com.jd.blockchain.consensus.service.NodeServer; -import com.jd.blockchain.consensus.service.ServerSettings; -import com.jd.blockchain.consensus.service.StateHandle; -import com.jd.blockchain.consensus.service.StateMachineReplicate; -import com.jd.blockchain.ledger.TransactionState; import com.jd.blockchain.utils.PropertiesUtils; import com.jd.blockchain.utils.concurrent.AsyncFuture; import com.jd.blockchain.utils.io.BytesUtils; import bftsmart.reconfiguration.util.HostsConfig; import bftsmart.reconfiguration.util.TOMConfiguration; -import bftsmart.tom.core.messages.TOMMessage; import bftsmart.tom.server.defaultservices.DefaultRecoverable; public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer { @@ -187,94 +186,243 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer return messageHandle.processUnordered(bytes).get(); } - @Override - public byte[][] appExecuteBatch(byte[][] commands, MessageContext[] msgCtxs, boolean fromConsensus) { - return appExecuteBatch(commands, msgCtxs, fromConsensus, null); + /** + * + * Only block, no reply, used by state transfer when peer start + * + */ + private void block(List manageConsensusCmds) { + + String batchId = messageHandle.beginBatch(realmName); + try { + int msgId = 0; + for (byte[] txContent : manageConsensusCmds) { + AsyncFuture asyncFuture = messageHandle.processOrdered(msgId++, txContent, realmName, batchId); + } + messageHandle.completeBatch(realmName, batchId); + messageHandle.commitBatch(realmName, batchId); + } catch (Exception e) { + // todo 需要处理应答码 404 + LOGGER.error("Error occurred while processing ordered messages! --" + e.getMessage(), e); + messageHandle.rollbackBatch(realmName, batchId, TransactionState.CONSENSUS_ERROR.CODE); + } + } - @Override - public byte[][] appExecuteBatch(byte[][] commands, MessageContext[] msgCtxs, boolean fromConsensus, List replyList) { + /** + * + * Local peer has cid diff with remote peer, used by state transfer when peer start + * + */ + private byte[][] appExecuteDiffBatch(byte[][] commands, MessageContext[] msgCtxs) { - if (replyList == null || replyList.size() == 0) { - throw new IllegalArgumentException(); - } - // todo 此部分需要重新改造 - /** - * 默认BFTSmart接口提供的commands是一个或多个共识结果的顺序集合 - * 根据共识的规定,目前的做法是将其根据msgCtxs的内容进行分组,每组都作为一个结块标识来处理 - * 从msgCtxs可以获取对应commands的分组情况 - */ int manageConsensusId = msgCtxs[0].getConsensusId(); List manageConsensusCmds = new ArrayList<>(); - List manageReplyMsgs = new ArrayList<>(); int index = 0; for (MessageContext msgCtx : msgCtxs) { if (msgCtx.getConsensusId() == manageConsensusId) { manageConsensusCmds.add(commands[index]); - manageReplyMsgs.add(replyList.get(index)); } else { // 达到结块标准,需要进行结块并应答 - blockAndReply(manageConsensusCmds, manageReplyMsgs); + block(manageConsensusCmds); // 重置链表和共识ID manageConsensusCmds = new ArrayList<>(); - manageReplyMsgs = new ArrayList<>(); manageConsensusId = msgCtx.getConsensusId(); manageConsensusCmds.add(commands[index]); - manageReplyMsgs.add(replyList.get(index)); } index++; } // 结束时,肯定有最后一个结块请求未处理 if (!manageConsensusCmds.isEmpty()) { - blockAndReply(manageConsensusCmds, manageReplyMsgs); + block(manageConsensusCmds); } return null; + } + /** + * + * Invoked by state transfer when peer start + * + */ + @Override + public byte[][] appExecuteBatch(byte[][] commands, MessageContext[] msgCtxs, boolean fromConsensus) { + + // Not from consensus outcomes, from state transfer + if (!fromConsensus) { + return appExecuteDiffBatch(commands, msgCtxs); + } + + return null; + } + + /** + * + * From consensus outcomes, do nothing now + * The operation of executing the batch was moved to the consensus stage 2 and 3, in order to guaranteed ledger consistency + */ + @Override + public byte[][] appExecuteBatch(byte[][] commands, MessageContext[] msgCtxs, boolean fromConsensus, List replyList) { + +// if (replyList == null || replyList.size() == 0) { +// throw new IllegalArgumentException(); +// } +// // todo 此部分需要重新改造 +// /** +// * 默认BFTSmart接口提供的commands是一个或多个共识结果的顺序集合 +// * 根据共识的规定,目前的做法是将其根据msgCtxs的内容进行分组,每组都作为一个结块标识来处理 +// * 从msgCtxs可以获取对应commands的分组情况 +// */ +// int manageConsensusId = msgCtxs[0].getConsensusId(); +// List manageConsensusCmds = new ArrayList<>(); +// List manageReplyMsgs = new ArrayList<>(); +// +// int index = 0; +// for (MessageContext msgCtx : msgCtxs) { +// if (msgCtx.getConsensusId() == manageConsensusId) { +// manageConsensusCmds.add(commands[index]); +// manageReplyMsgs.add(replyList.get(index)); +// } else { +// // 达到结块标准,需要进行结块并应答 +// blockAndReply(manageConsensusCmds, manageReplyMsgs); +// // 重置链表和共识ID +// manageConsensusCmds = new ArrayList<>(); +// manageReplyMsgs = new ArrayList<>(); +// manageConsensusId = msgCtx.getConsensusId(); +// manageConsensusCmds.add(commands[index]); +// manageReplyMsgs.add(replyList.get(index)); +// } +// index++; +// } +// // 结束时,肯定有最后一个结块请求未处理 +// if (!manageConsensusCmds.isEmpty()) { +// blockAndReply(manageConsensusCmds, manageReplyMsgs); +// } + return null; + } + + /** + * + * Block and reply are moved to consensus completion stage + * + */ private void blockAndReply(List manageConsensusCmds, List replyList) { +// consensusBatchId = messageHandle.beginBatch(realmName); +// List> asyncFutureLinkedList = new ArrayList<>(manageConsensusCmds.size()); +// try { +// int msgId = 0; +// for (byte[] txContent : manageConsensusCmds) { +// AsyncFuture asyncFuture = messageHandle.processOrdered(msgId++, txContent, realmName, consensusBatchId); +// asyncFutureLinkedList.add(asyncFuture); +// } +// messageHandle.completeBatch(realmName, consensusBatchId); +// messageHandle.commitBatch(realmName, consensusBatchId); +// } catch (Exception e) { +// // todo 需要处理应答码 404 +// LOGGER.error("Error occurred while processing ordered messages! --" + e.getMessage(), e); +// messageHandle.rollbackBatch(realmName, consensusBatchId, TransactionState.CONSENSUS_ERROR.CODE); +// } +// +// // 通知线程单独处理应答 +// notifyReplyExecutors.execute(() -> { +// // 应答对应的结果 +// int replyIndex = 0; +// for(ReplyContextMessage msg : replyList) { +// msg.setReply(asyncFutureLinkedList.get(replyIndex).get()); +// TOMMessage request = msg.getTomMessage(); +// ReplyContext replyContext = msg.getReplyContext(); +// request.reply = new TOMMessage(replyContext.getId(), request.getSession(), request.getSequence(), +// request.getOperationId(), msg.getReply(), replyContext.getCurrentViewId(), +// request.getReqType()); +// +// if (replyContext.getNumRepliers() > 0) { +// bftsmart.tom.util.Logger.println("(ServiceReplica.receiveMessages) sending reply to " +// + request.getSender() + " with sequence number " + request.getSequence() +// + " and operation ID " + request.getOperationId() + " via ReplyManager"); +// replyContext.getRepMan().send(request); +// } else { +// bftsmart.tom.util.Logger.println("(ServiceReplica.receiveMessages) sending reply to " +// + request.getSender() + " with sequence number " + request.getSequence() +// + " and operation ID " + request.getOperationId()); +// replyContext.getReplier().manageReply(request, msg.getMessageContext()); +// } +// replyIndex++; +// } +// }); + } + + /** + * + * Used by consensus write phase, pre compute new block hash + * + */ + public BlockResultImpl preComputeBlockHash(byte[][] commands) { String batchId = messageHandle.beginBatch(realmName); - List> asyncFutureLinkedList = new ArrayList<>(manageConsensusCmds.size()); + List> asyncFutureLinkedList = new ArrayList<>(commands.length); + List responseLinkedList = new ArrayList<>(); try { int msgId = 0; - for (byte[] txContent : manageConsensusCmds) { + for (byte[] txContent : commands) { AsyncFuture asyncFuture = messageHandle.processOrdered(msgId++, txContent, realmName, batchId); asyncFutureLinkedList.add(asyncFuture); } - messageHandle.completeBatch(realmName, batchId); - messageHandle.commitBatch(realmName, batchId); + StateSnapshot stateSnapshot = messageHandle.completeBatch(realmName, batchId); + byte[] blockHashBytes = stateSnapshot.getSnapshot(); + + for (int i = 0; i< asyncFutureLinkedList.size(); i++) { + responseLinkedList.add(asyncFutureLinkedList.get(i).get()); + } + + + return new BlockResultImpl(responseLinkedList, blockHashBytes, batchId); + } catch (Exception e) { // todo 需要处理应答码 404 - LOGGER.error("Error occurred while processing ordered messages! --" + e.getMessage(), e); - messageHandle.rollbackBatch(realmName, batchId, TransactionState.CONSENSUS_ERROR.CODE); + LOGGER.error("Error occurred while processing ordered messages! --" + e.getMessage(), e); + messageHandle.rollbackBatch(realmName, batchId, TransactionState.IGNORED_BY_CONSENSUS_PHASE_PRECOMPUTE_ROLLBACK.CODE); } - // 通知线程单独处理应答 - notifyReplyExecutors.execute(() -> { - // 应答对应的结果 - int replyIndex = 0; - for(ReplyContextMessage msg : replyList) { - msg.setReply(asyncFutureLinkedList.get(replyIndex).get()); - TOMMessage request = msg.getTomMessage(); - ReplyContext replyContext = msg.getReplyContext(); - request.reply = new TOMMessage(replyContext.getId(), request.getSession(), request.getSequence(), - request.getOperationId(), msg.getReply(), replyContext.getCurrentViewId(), - request.getReqType()); - - if (replyContext.getNumRepliers() > 0) { - bftsmart.tom.util.Logger.println("(ServiceReplica.receiveMessages) sending reply to " - + request.getSender() + " with sequence number " + request.getSequence() - + " and operation ID " + request.getOperationId() + " via ReplyManager"); - replyContext.getRepMan().send(request); - } else { - bftsmart.tom.util.Logger.println("(ServiceReplica.receiveMessages) sending reply to " - + request.getSender() + " with sequence number " + request.getSequence() - + " and operation ID " + request.getOperationId()); - replyContext.getReplier().manageReply(request, msg.getMessageContext()); - } - replyIndex++; - } - }); + return null; + } + + /** + * + * Consensus write phase will terminate, new block hash values are inconsistent, update batch messages execute state + * + */ + public List updateBatchResponses(List asyncResponseLinkedList) { + List updatedResponses = new ArrayList<>(); + + for(int i = 0; i < asyncResponseLinkedList.size(); i++) { + TransactionResponse txResponse = BinaryProtocol.decode(asyncResponseLinkedList.get(i)); + TxResponseMessage resp = new TxResponseMessage(txResponse.getContentHash()); + resp.setExecutionState(TransactionState.IGNORED_BY_CONSENSUS_PHASE_PRECOMPUTE_ROLLBACK); + updatedResponses.add(BinaryProtocol.encode(resp, TransactionResponse.class)); + } + + return updatedResponses; + } + + /** + * + * Decision has been made at the consensus stage, commit block + * + */ + public void preComputeBlockCommit(String batchId) { + + messageHandle.commitBatch(realmName, batchId); + + } + + /** + * + * Consensus write phase will terminate, new block hash values are inconsistent, rollback block + * + */ + public void preComputeBlockRollback(String batchId) { + messageHandle.rollbackBatch(realmName, batchId, TransactionState.IGNORED_BY_CONSENSUS_PHASE_PRECOMPUTE_ROLLBACK.CODE); + LOGGER.debug("Rollback of operations that cause inconsistencies in the ledger"); } //notice diff --git a/source/contract/contract-samples/pom.xml b/source/contract/contract-samples/pom.xml index 22b493e5..fc3a89b9 100644 --- a/source/contract/contract-samples/pom.xml +++ b/source/contract/contract-samples/pom.xml @@ -34,11 +34,11 @@ maven-assembly-plugin - transfer + random false - com.jd.blockchain.contract.TransferContractImpl + com.jd.blockchain.contract.RandomContractImpl diff --git a/source/contract/contract-samples/src/main/java/com/jd/blockchain/contract/RandomContract.java b/source/contract/contract-samples/src/main/java/com/jd/blockchain/contract/RandomContract.java new file mode 100644 index 00000000..3df56f82 --- /dev/null +++ b/source/contract/contract-samples/src/main/java/com/jd/blockchain/contract/RandomContract.java @@ -0,0 +1,11 @@ +package com.jd.blockchain.contract; + +@Contract +public interface RandomContract { + + @ContractEvent(name = "random-put") + void put(String address, String key, String value); + + @ContractEvent(name = "random-putAndGet") + String putAndGet(String address, String key, String value); +} diff --git a/source/contract/contract-samples/src/main/java/com/jd/blockchain/contract/RandomContractImpl.java b/source/contract/contract-samples/src/main/java/com/jd/blockchain/contract/RandomContractImpl.java new file mode 100644 index 00000000..55f49661 --- /dev/null +++ b/source/contract/contract-samples/src/main/java/com/jd/blockchain/contract/RandomContractImpl.java @@ -0,0 +1,43 @@ +package com.jd.blockchain.contract; + +import com.jd.blockchain.crypto.HashDigest; + +import java.util.Random; + +public class RandomContractImpl implements RandomContract, EventProcessingAware { + + private static final Random RANDOM_TIME = new Random(); + + private ContractEventContext eventContext; + + private HashDigest ledgerHash; + + @Override + public void beforeEvent(ContractEventContext eventContext) { + this.eventContext = eventContext; + this.ledgerHash = eventContext.getCurrentLedgerHash(); + } + + @Override + public void postEvent(ContractEventContext eventContext, Exception error) { + + } + + @Override + public void put(String address, String key, String value) { + + String saveVal = value + "-" + RANDOM_TIME.nextInt(1024); + + eventContext.getLedger().dataAccount(address).setText(key, saveVal, -1L); + } + + @Override + public String putAndGet(String address, String key, String value) { + + String saveVal = value + "-" + RANDOM_TIME.nextInt(1024); + + eventContext.getLedger().dataAccount(address).setText(key, saveVal, -1L); + + return address; + } +} diff --git a/source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/TransactionState.java b/source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/TransactionState.java index 6955eb94..518d793e 100644 --- a/source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/TransactionState.java +++ b/source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/TransactionState.java @@ -74,6 +74,13 @@ public enum TransactionState { IGNORED_BY_BLOCK_FULL_ROLLBACK((byte) 0x44), /** + * + * 共识阶段加入新区块哈希预计算功能, 如果来自其他Peer的新区块哈希值不一致,本批次整体回滚 + * + */ + IGNORED_BY_CONSENSUS_PHASE_PRECOMPUTE_ROLLBACK((byte) 0x45), + + /** * 系统错误; */ SYSTEM_ERROR((byte) 0x80), diff --git a/source/peer/src/main/java/com/jd/blockchain/peer/consensus/ConsensusMessageDispatcher.java b/source/peer/src/main/java/com/jd/blockchain/peer/consensus/ConsensusMessageDispatcher.java index be312e44..7a3ba62f 100644 --- a/source/peer/src/main/java/com/jd/blockchain/peer/consensus/ConsensusMessageDispatcher.java +++ b/source/peer/src/main/java/com/jd/blockchain/peer/consensus/ConsensusMessageDispatcher.java @@ -245,6 +245,9 @@ public class ConsensusMessageDispatcher implements MessageHandle { realmLock.lock(); try { batchResultHandle.cancel(TransactionState.valueOf((byte)reasonCode)); + currBatchId = null; + txResponseMap = null; + txBatchProcess = null; } finally { realmLock.unlock(); } diff --git a/source/sdk/sdk-samples/src/main/java/com/jd/blockchain/sdk/samples/SDK_Contract_Random_Demo.java b/source/sdk/sdk-samples/src/main/java/com/jd/blockchain/sdk/samples/SDK_Contract_Random_Demo.java new file mode 100644 index 00000000..c73a7f91 --- /dev/null +++ b/source/sdk/sdk-samples/src/main/java/com/jd/blockchain/sdk/samples/SDK_Contract_Random_Demo.java @@ -0,0 +1,120 @@ +package com.jd.blockchain.sdk.samples; + +import com.jd.blockchain.contract.RandomContract; +import com.jd.blockchain.contract.TransferContract; +import com.jd.blockchain.ledger.*; +import com.jd.blockchain.transaction.GenericValueHolder; +import com.jd.blockchain.transaction.LongValueHolder; +import com.jd.blockchain.utils.Bytes; + +import java.util.Random; + +import static com.jd.blockchain.sdk.samples.SDKDemo_Constant.readChainCodes; +import static com.jd.blockchain.transaction.ContractReturnValue.decode; + +public class SDK_Contract_Random_Demo extends SDK_Base_Demo { + + public static void main(String[] args) throws Exception { + new SDK_Contract_Random_Demo().executeContract(); + } + + public void executeContract() throws Exception { + + // 发布jar包 + // 定义交易模板 + TransactionTemplate txTpl = blockchainService.newTransaction(ledgerHash); + + // 将jar包转换为二进制数据 + byte[] contractCode = readChainCodes("random.jar"); + + // 生成一个合约账号 + BlockchainKeypair contractDeployKey = BlockchainKeyGenerator.getInstance().generate(); + + // 生成发布合约操作 + txTpl.contracts().deploy(contractDeployKey.getIdentity(), contractCode); + + // 生成预发布交易; + PreparedTransaction ptx = txTpl.prepare(); + + // 对交易进行签名 + ptx.sign(adminKey); + + // 提交并等待共识返回; + TransactionResponse txResp = ptx.commit(); + + // 获取合约地址 + Bytes contractAddress = contractDeployKey.getAddress(); + + // 打印交易返回信息 + System.out.printf("Tx[%s] -> BlockHeight = %s, BlockHash = %s, isSuccess = %s, ExecutionState = %s \r\n", + txResp.getContentHash().toBase58(), txResp.getBlockHeight(), txResp.getBlockHash().toBase58(), + txResp.isSuccess(), txResp.getExecutionState()); + + // 打印合约地址 + System.out.printf("ContractAddress = %s \r\n", contractAddress.toBase58()); + + String result = create("LdeNzfhZd2qiBRk3YrEX6GZgiVRZJaf3MKJAY", "zhangshuang", "jingdong", contractAddress); + + + Thread.sleep(5000); + System.out.println(result); + } + + private String readAll(String address, String account, Bytes contractAddress) { + TransactionTemplate txTpl = blockchainService.newTransaction(ledgerHash); + // 使用合约创建 + TransferContract transferContract = txTpl.contract(contractAddress, TransferContract.class); + GenericValueHolder result = decode(transferContract.readAll(address, account)); + commit(txTpl); + return result.get(); + } + + private long readByContract(String address, String account, Bytes contractAddress) { + TransactionTemplate txTpl = blockchainService.newTransaction(ledgerHash); + // 使用合约创建 + TransferContract transferContract = txTpl.contract(contractAddress, TransferContract.class); + LongValueHolder result = decode(transferContract.read(address, account)); + commit(txTpl); + return result.get(); + } + + private long readByKvOperation(String address, String account) { + KVDataEntry[] kvDataEntries = blockchainService.getDataEntries(ledgerHash, address, account); + if (kvDataEntries == null || kvDataEntries.length == 0) { + throw new IllegalStateException(String.format("Ledger %s Service inner Error !!!", ledgerHash.toBase58())); + } + KVDataEntry kvDataEntry = kvDataEntries[0]; + if (kvDataEntry.getVersion() == -1) { + return 0L; + } + return (long) (kvDataEntry.getValue()); + } + + private String transfer(String address, String from, String to, long money, Bytes contractAddress) { + TransactionTemplate txTpl = blockchainService.newTransaction(ledgerHash); + // 使用合约创建 + TransferContract transferContract = txTpl.contract(contractAddress, TransferContract.class); + GenericValueHolder result = decode(transferContract.transfer(address, from, to, money)); + commit(txTpl); + return result.get(); + } + + private BlockchainKeypair createDataAccount() { + // 首先注册一个数据账户 + BlockchainKeypair newDataAccount = BlockchainKeyGenerator.getInstance().generate(); + + TransactionTemplate txTpl = blockchainService.newTransaction(ledgerHash); + txTpl.dataAccounts().register(newDataAccount.getIdentity()); + commit(txTpl); + return newDataAccount; + } + + private String create(String address, String account, String value, Bytes contractAddress) { + TransactionTemplate txTpl = blockchainService.newTransaction(ledgerHash); + // 使用合约创建 + RandomContract randomContract = txTpl.contract(contractAddress, RandomContract.class); + GenericValueHolder result = decode(randomContract.putAndGet(address, account, value)); + commit(txTpl); + return result.get(); + } +} diff --git a/source/storage/storage-redis/src/main/java/com/jd/blockchain/storage/service/impl/redis/JedisConnection.java b/source/storage/storage-redis/src/main/java/com/jd/blockchain/storage/service/impl/redis/JedisConnection.java index 0494b53f..27ff22f4 100644 --- a/source/storage/storage-redis/src/main/java/com/jd/blockchain/storage/service/impl/redis/JedisConnection.java +++ b/source/storage/storage-redis/src/main/java/com/jd/blockchain/storage/service/impl/redis/JedisConnection.java @@ -18,7 +18,7 @@ public class JedisConnection implements DbConnection { @Override public void close() { - jedisPool.close(); +// jedisPool.close(); } @Override