Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ protected Map<String, List<String>> getComponentHostsMap() {
String componentName = componentHost.getComponentName();
List<String> hostnames = componentHost.getHostnames();
if (CollectionUtils.isEmpty(hostnames)) {
throw new RuntimeException("No hostnames found for component " + componentName);
// throw new RuntimeException("No hostnames found for component "
// + componentName);
continue;
}
componentHostsMap.put(componentName, hostnames);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,23 @@
<category>server</category>
<cardinality>1+</cardinality>
</component>
<component>
<name>zkfc</name>
<display-name>zkfc</display-name>
<category>server</category>
<cardinality>0-2</cardinality>
</component>
<component>
<name>journalnode</name>
<display-name>journalnode</display-name>
<category>server</category>
<cardinality>0-3</cardinality>
</component>
<component>
<name>secondarynamenode</name>
<display-name>SNameNode</display-name>
<category>server</category>
<cardinality>1</cardinality>
<cardinality>0-1</cardinality>
</component>

<!-- Yarn Components -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Getter
@Slf4j
Expand All @@ -62,6 +65,9 @@ public class HadoopParams extends BigtopParams {
private String dfsNameNodeDir;
private String dfsNameNodeCheckPointDir;
private String dfsDomainSocketPathPrefix;
private String dfsJourNalNodeDir;
private String dfsHttpPort;
private String journalHttpPort;

private String nodeManagerLogDir = "/hadoop/yarn/log";
private String nodeManagerLocalDir = "/hadoop/yarn/local";
Expand Down Expand Up @@ -99,9 +105,27 @@ public Map<String, Object> hdfsLog4j() {
public Map<String, Object> coreSite() {
Map<String, Object> coreSite = LocalSettings.configurations(getServiceName(), "core-site");
List<String> namenodeList = LocalSettings.componentHosts("namenode");
if (!namenodeList.isEmpty()) {
List<String> zookeeperServerHosts = LocalSettings.componentHosts("zookeeper_server");
Map<String, Object> ZKPort = LocalSettings.configurations("zookeeper", "zoo.cfg");
String clientPort = (String) ZKPort.get("clientPort");
StringBuilder zkString = new StringBuilder();
for (int i = 0; i < zookeeperServerHosts.size(); i++) {
String host = zookeeperServerHosts.get(i);
if (host == null || host.trim().isEmpty()) {
continue;
}
zkString.append(host.trim()).append(":").append(clientPort);
if (i != zookeeperServerHosts.size() - 1) {
zkString.append(",");
}
}
if (!namenodeList.isEmpty() && namenodeList.size() == 1) {
coreSite.put(
"fs.defaultFS", ((String) coreSite.get("fs.defaultFS")).replace("localhost", namenodeList.get(0)));
} else if (!namenodeList.isEmpty() && namenodeList.size() == 2) {
coreSite.put(
"fs.defaultFS", ((String) coreSite.get("fs.defaultFS")).replace("localhost:8020", "nameservice1"));
coreSite.put("ha.zookeeper.quorum", zkString);
}
return coreSite;
}
Expand All @@ -115,7 +139,8 @@ public Map<String, Object> hadoopPolicy() {
public Map<String, Object> hdfsSite() {
Map<String, Object> hdfsSite = LocalSettings.configurations(getServiceName(), "hdfs-site");
List<String> namenodeList = LocalSettings.componentHosts("namenode");
if (!namenodeList.isEmpty()) {
List<String> journalNodeList = LocalSettings.componentHosts("journalnode");
if (!namenodeList.isEmpty() && namenodeList.size() == 1) {
hdfsSite.put(
"dfs.namenode.rpc-address",
((String) hdfsSite.get("dfs.namenode.rpc-address")).replace("0.0.0.0", namenodeList.get(0)));
Expand All @@ -125,6 +150,26 @@ public Map<String, Object> hdfsSite() {
hdfsSite.put(
"dfs.namenode.https-address",
((String) hdfsSite.get("dfs.namenode.https-address")).replace("0.0.0.0", namenodeList.get(0)));
} else if (!namenodeList.isEmpty() && namenodeList.size() == 2) {
hdfsSite.remove("dfs.namenode.http-address");
hdfsSite.put("dfs.ha.automatic-failover.enabled", "true");
hdfsSite.put("dfs.nameservices", "nameservice1");
hdfsSite.put("dfs.ha.namenodes.nameservice1", "nn1,nn2");
hdfsSite.put("dfs.namenode.rpc-address.nameservice1.nn1", namenodeList.get(0) + ":8020");
hdfsSite.put("dfs.namenode.rpc-address.nameservice1.nn2", namenodeList.get(1) + ":8020");
hdfsSite.put("dfs.namenode.http-address.nameservice1.nn1", namenodeList.get(0) + ":9870");
hdfsSite.put("dfs.namenode.http-address.nameservice1.nn2", namenodeList.get(1) + ":9870");
hdfsSite.put(
"dfs.namenode.shared.edits.dir",
"qjournal://" + journalNodeList.get(0) + ":8485;" + journalNodeList.get(1) + ":8485;"
+ journalNodeList.get(2) + ":8485" + "/nameservice1");
hdfsSite.put("dfs.journalnode.edits.dir", "/hadoop/dfs/journal");
hdfsSite.put(
"dfs.client.failover.proxy.provider.nameservice1",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
hdfsSite.put("dfs.journalnode.edits.dir", "/hadoop/dfs/journal");
hdfsSite.put("dfs.ha.fencing.methods", "shell(/bin/true)");
hdfsSite.put("dfs.replication", "3");
}

// Configure native library dependent settings
Expand All @@ -135,12 +180,30 @@ public Map<String, Object> hdfsSite() {
nameNodeFormattedDirs = Arrays.stream(dfsNameNodeDir.split(","))
.map(x -> x + "/namenode-formatted/")
.toList();

String dfsHttpAddress = (String) hdfsSite.get("dfs.namenode.http-address.nameservice1.nn1");
if (dfsHttpAddress != null && dfsHttpAddress.contains(":")) {
String[] parts = dfsHttpAddress.split(":");
if (parts.length >= 2) {
dfsHttpPort = parts[1].trim();
}
}
String journalHttpAddress = (String) hdfsSite.get("dfs.namenode.shared.edits.dir");
Pattern pattern = Pattern.compile(":(\\d{1,5})");
Matcher matcher = pattern.matcher(journalHttpAddress);
if (matcher.find()) {
journalHttpPort = matcher.group(1);
log.info("find jounalnode port: " + journalHttpPort);
} else {
log.warn("not found journalnode port!");
}
String dfsDomainSocketPath = (String) hdfsSite.get("dfs.domain.socket.path");
if (StringUtils.isNotBlank(dfsDomainSocketPath)) {
dfsDomainSocketPathPrefix = dfsDomainSocketPath.replace("dn._PORT", "");
File file = new File(dfsDomainSocketPath);
dfsDomainSocketPathPrefix = file.getParent();
// dfsDomainSocketPathPrefix = dfsDomainSocketPath.replace("dn._PORT", "");
}
dfsNameNodeCheckPointDir = (String) hdfsSite.get("dfs.namenode.checkpoint.dir");
dfsJourNalNodeDir = (String) hdfsSite.get("dfs.journalnode.edits.dir");
return hdfsSite;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.bigtop.manager.stack.core.enums.ConfigType;
import org.apache.bigtop.manager.stack.core.exception.StackException;
import org.apache.bigtop.manager.stack.core.spi.param.Params;
import org.apache.bigtop.manager.stack.core.utils.LocalSettings;
import org.apache.bigtop.manager.stack.core.utils.linux.LinuxFileUtils;
import org.apache.bigtop.manager.stack.core.utils.linux.LinuxOSUtils;

Expand All @@ -33,8 +34,12 @@
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.text.MessageFormat;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
Expand Down Expand Up @@ -75,6 +80,14 @@ public static ShellResult configure(Params params, String componentName) {
Constants.PERMISSION_755,
true);
}
case "journalnode": {
LinuxFileUtils.createDirectories(
hadoopParams.getDfsJourNalNodeDir(),
hadoopUser,
hadoopGroup,
Constants.PERMISSION_755,
true);
}
case "datanode": {
LinuxFileUtils.createDirectories(
hadoopParams.getDfsDomainSocketPathPrefix(),
Expand Down Expand Up @@ -221,9 +234,13 @@ public static ShellResult configure(Params params, String componentName) {
public static void formatNameNode(HadoopParams hadoopParams) {
if (!isNameNodeFormatted(hadoopParams)) {
String formatCmd = MessageFormat.format(
"{0}/hdfs --config {1} namenode -format -nonInteractive",
"{0}/hdfs --config {1} namenode -format -nonInteractive -force",
hadoopParams.binDir(), hadoopParams.confDir());
try {
boolean allJnReachable = checkAllJournalNodesPortReachable(hadoopParams);
if (!allJnReachable) {
throw new StackException("Cannot format NameNode: Some JournalNodes are unreachable.");
}
LinuxOSUtils.sudoExecCmd(formatCmd, hadoopParams.user());
} catch (Exception e) {
throw new StackException(e);
Expand All @@ -240,6 +257,59 @@ public static void formatNameNode(HadoopParams hadoopParams) {
}
}

private static boolean checkAllJournalNodesPortReachable(HadoopParams hadoopParams) throws InterruptedException {
List<String> journalNodeList = LocalSettings.componentHosts("journalnode");
String port = hadoopParams.getJournalHttpPort();
if (journalNodeList == null || journalNodeList.isEmpty()) {
throw new IllegalArgumentException("JournalNode host list cannot be empty!");
}
int retryCount = 0;
int maxRetry = 100;
long retryIntervalMs = 2000;
int connectTimeoutMs = 1000;
while (retryCount < maxRetry) {
boolean allReachable = true;
for (String host : journalNodeList) {
boolean isReachable = false;
Socket socket = null;
try {
socket = new Socket();
socket.connect(new InetSocketAddress(host, Integer.parseInt(port)), connectTimeoutMs);
isReachable = true;
log.info("JournalNode [{}:{}] is reachable.", host, port);
} catch (Exception e) {
allReachable = false;
log.warn(
"JournalNode [{}:{}] is NOT reachable (retry {}/{}). Error: {}",
host,
port,
retryCount + 1,
maxRetry,
e.getMessage());
} finally {
if (socket != null && !socket.isClosed()) {
try {
socket.close();
} catch (Exception e) {
log.debug("Failed to close socket for [{}:{}].", host, port, e);
}
}
}
}
if (allReachable) {
log.info("All {} JournalNodes are reachable. Proceeding to format NameNode.", journalNodeList.size());
return true;
}
retryCount++;
if (retryCount < maxRetry) {
log.info("Waiting {}ms before next retry ({} remaining).", retryIntervalMs, maxRetry - retryCount);
TimeUnit.MILLISECONDS.sleep(retryIntervalMs);
}
}
log.error("Failed to reach all JournalNodes after {} retries. JournalNode list: {}", maxRetry, journalNodeList);
return false;
}

public static boolean isNameNodeFormatted(HadoopParams hadoopParams) {

boolean isFormatted = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bigtop.manager.stack.bigtop.v3_3_0.hadoop;

import org.apache.bigtop.manager.common.shell.ShellResult;
import org.apache.bigtop.manager.stack.core.exception.StackException;
import org.apache.bigtop.manager.stack.core.spi.param.Params;
import org.apache.bigtop.manager.stack.core.spi.script.AbstractServerScript;
import org.apache.bigtop.manager.stack.core.spi.script.Script;
import org.apache.bigtop.manager.stack.core.utils.linux.LinuxOSUtils;

import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;

import java.text.MessageFormat;
import java.util.Properties;

@Slf4j
@AutoService(Script.class)
public class JournalNodeScript extends AbstractServerScript {
@Override
public ShellResult add(Params params) {
Properties properties = new Properties();
properties.setProperty(PROPERTY_KEY_SKIP_LEVELS, "1");

return super.add(params, properties);
}

@Override
public ShellResult configure(Params params) {
super.configure(params);

return HadoopSetup.configure(params, getComponentName());
}

@Override
public ShellResult start(Params params) {
configure(params);
HadoopParams hadoopParams = (HadoopParams) params;

String cmd = MessageFormat.format("{0}/hdfs --daemon start journalnode", hadoopParams.binDir());
try {
return LinuxOSUtils.sudoExecCmd(cmd, hadoopParams.user());
} catch (Exception e) {
throw new StackException(e);
}
}

@Override
public ShellResult stop(Params params) {
HadoopParams hadoopParams = (HadoopParams) params;
String cmd = MessageFormat.format("{0}/hdfs --daemon stop journalnode", hadoopParams.binDir());
try {
return LinuxOSUtils.sudoExecCmd(cmd, hadoopParams.user());
} catch (Exception e) {
throw new StackException(e);
}
}

@Override
public ShellResult status(Params params) {
HadoopParams hadoopParams = (HadoopParams) params;
return LinuxOSUtils.checkProcess(hadoopParams.getJournalNodePidFile());
}

@Override
public String getComponentName() {
return "journalnode";
}
}
Loading
Loading