diff --git a/src/main/java/io/openmessaging/storage/dledger/client/DLedgerClient.java b/src/main/java/io/openmessaging/storage/dledger/client/DLedgerClient.java index 30b0e156..24a343f9 100644 --- a/src/main/java/io/openmessaging/storage/dledger/client/DLedgerClient.java +++ b/src/main/java/io/openmessaging/storage/dledger/client/DLedgerClient.java @@ -29,6 +29,9 @@ import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest; import io.openmessaging.storage.dledger.utils.DLedgerUtils; +import io.openmessaging.storage.dledger.utils.PreConditions; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -133,6 +136,39 @@ public LeadershipTransferResponse leadershipTransfer(String curLeaderId, String } } + public MetadataResponse getMetadata() { + List responses = new ArrayList<>(peerMap.size()); + List> futures = new ArrayList<>(peerMap.size()); + for (Map.Entry peer : peerMap.entrySet()) { + MetadataRequest request = new MetadataRequest(); + request.setGroup(group); + request.setRemoteId(peer.getKey()); + try { + CompletableFuture future = dLedgerClientRpcService.metadata(request); + futures.add(future); + } catch (Exception e) { + logger.warn("Get metadata failed", e); + } + } + if (futures.size() == 0) { + return null; + } + for (CompletableFuture future : futures) { + try { + MetadataResponse response = future.get(1500, TimeUnit.MILLISECONDS); + responses.add(response); + } catch (Throwable t) { + logger.warn("Get metadata failed", t); + } + } + if (responses.size() == 0) { + return null; + } + PreConditions.check(responses.stream().allMatch(response -> response.getPeers().equals(this.peerMap)), + DLedgerResponseCode.METADATA_ERROR); + return responses.get(0); + } + public void startup() { this.dLedgerClientRpcService.startup(); this.metadataUpdater.start(); diff --git a/src/main/java/io/openmessaging/storage/dledger/cmdline/BossCommand.java b/src/main/java/io/openmessaging/storage/dledger/cmdline/BossCommand.java index d372ea3f..5b0e8659 100644 --- a/src/main/java/io/openmessaging/storage/dledger/cmdline/BossCommand.java +++ b/src/main/java/io/openmessaging/storage/dledger/cmdline/BossCommand.java @@ -31,6 +31,7 @@ public static void main(String args[]) { commands.put("get", new GetCommand()); commands.put("readFile", new ReadFileCommand()); commands.put("leadershipTransfer", new LeadershipTransferCommand()); + commands.put("metadata", new MetadataCommand()); JCommander.Builder builder = JCommander.newBuilder(); builder.addCommand("server", new DLedgerConfig()); diff --git a/src/main/java/io/openmessaging/storage/dledger/cmdline/MetadataCommand.java b/src/main/java/io/openmessaging/storage/dledger/cmdline/MetadataCommand.java new file mode 100644 index 00000000..a5ba93d7 --- /dev/null +++ b/src/main/java/io/openmessaging/storage/dledger/cmdline/MetadataCommand.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.cmdline; + +import com.alibaba.fastjson.JSON; +import com.beust.jcommander.Parameter; +import io.openmessaging.storage.dledger.client.DLedgerClient; +import io.openmessaging.storage.dledger.protocol.MetadataResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetadataCommand extends BaseCommand { + private static Logger logger = LoggerFactory.getLogger(GetCommand.class); + + @Parameter(names = {"--group", "-g"}, description = "Group of this server") + private String group = "default"; + + @Parameter(names = {"--peers", "-p"}, description = "Peer info of this server") + private String peers = "n0-localhost:20911"; + + @Override + public void doCommand() { + DLedgerClient dLedgerClient = new DLedgerClient(group, peers); + dLedgerClient.startup(); + MetadataResponse response = dLedgerClient.getMetadata(); + logger.info("Get Result:{}", JSON.toJSONString(response)); + dLedgerClient.shutdown(); + } +} \ No newline at end of file