Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
623 changes: 615 additions & 8 deletions packaging/src/kubernetes/README.md

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ rules:
- apiGroups: ["apps"]
resources: ["deployments", "statefulsets"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
# Scale subresource for operator-driven autoscaling
- apiGroups: ["apps"]
resources: ["deployments/scale", "statefulsets/scale"]
verbs: ["get", "update", "patch"]
# Jobs for schema initialization
- apiGroups: ["batch"]
resources: ["jobs"]
Expand All @@ -46,7 +50,11 @@ rules:
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "patch"]
# Pods: read-only for readiness checking
# Pods: read + patch (patch needed for pod-deletion-cost annotation)
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
verbs: ["get", "list", "watch", "patch"]
# PodDisruptionBudgets for graceful autoscaling
- apiGroups: ["policy"]
resources: ["poddisruptionbudgets"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ spec:
extraVolumeMounts:
{{- toYaml .Values.cluster.metastore.extraVolumeMounts | nindent 6 }}
{{- end }}
{{- if and .Values.cluster.metastore.autoscaling .Values.cluster.metastore.autoscaling.enabled }}
autoscaling:
enabled: true
minReplicas: {{ .Values.cluster.metastore.autoscaling.minReplicas }}
scaleUpThreshold: {{ .Values.cluster.metastore.autoscaling.scaleUpThreshold }}
scaleUpStabilizationSeconds: {{ .Values.cluster.metastore.autoscaling.scaleUpStabilizationSeconds }}
scaleDownStabilizationSeconds: {{ .Values.cluster.metastore.autoscaling.scaleDownStabilizationSeconds }}
gracePeriodSeconds: {{ .Values.cluster.metastore.autoscaling.gracePeriodSeconds }}
metricsScrapeIntervalSeconds: {{ .Values.cluster.metastore.autoscaling.metricsScrapeIntervalSeconds | default 10 }}
cpuScaleUpThreshold: {{ .Values.cluster.metastore.autoscaling.cpuScaleUpThreshold | default 90 }}
cpuScaleDownThreshold: {{ .Values.cluster.metastore.autoscaling.cpuScaleDownThreshold | default 30 }}
{{- end }}
{{- else }}
{{- if .Values.cluster.metastore.externalUri }}
externalUri: {{ .Values.cluster.metastore.externalUri | quote }}
Expand Down Expand Up @@ -96,6 +108,18 @@ spec:
extraVolumeMounts:
{{- toYaml .Values.cluster.hiveServer2.extraVolumeMounts | nindent 6 }}
{{- end }}
{{- if and .Values.cluster.hiveServer2.autoscaling .Values.cluster.hiveServer2.autoscaling.enabled }}
autoscaling:
enabled: true
minReplicas: {{ .Values.cluster.hiveServer2.autoscaling.minReplicas }}
scaleUpThreshold: {{ .Values.cluster.hiveServer2.autoscaling.scaleUpThreshold }}
scaleUpStabilizationSeconds: {{ .Values.cluster.hiveServer2.autoscaling.scaleUpStabilizationSeconds }}
scaleDownStabilizationSeconds: {{ .Values.cluster.hiveServer2.autoscaling.scaleDownStabilizationSeconds }}
gracePeriodSeconds: {{ .Values.cluster.hiveServer2.autoscaling.gracePeriodSeconds }}
metricsScrapeIntervalSeconds: {{ .Values.cluster.hiveServer2.autoscaling.metricsScrapeIntervalSeconds | default 10 }}
cpuScaleUpThreshold: {{ .Values.cluster.hiveServer2.autoscaling.cpuScaleUpThreshold | default 90 }}
cpuScaleDownThreshold: {{ .Values.cluster.hiveServer2.autoscaling.cpuScaleDownThreshold | default 30 }}
{{- end }}

llap:
enabled: {{ .Values.cluster.llap.enabled }}
Expand All @@ -120,6 +144,16 @@ spec:
extraVolumeMounts:
{{- toYaml .Values.cluster.llap.extraVolumeMounts | nindent 6 }}
{{- end }}
{{- if and .Values.cluster.llap.autoscaling .Values.cluster.llap.autoscaling.enabled }}
autoscaling:
enabled: true
minReplicas: {{ .Values.cluster.llap.autoscaling.minReplicas }}
scaleUpThreshold: {{ .Values.cluster.llap.autoscaling.scaleUpThreshold }}
scaleUpStabilizationSeconds: {{ .Values.cluster.llap.autoscaling.scaleUpStabilizationSeconds }}
scaleDownStabilizationSeconds: {{ .Values.cluster.llap.autoscaling.scaleDownStabilizationSeconds }}
gracePeriodSeconds: {{ .Values.cluster.llap.autoscaling.gracePeriodSeconds }}
metricsScrapeIntervalSeconds: {{ .Values.cluster.llap.autoscaling.metricsScrapeIntervalSeconds | default 10 }}
{{- end }}
{{- end }}

tezAm:
Expand All @@ -146,6 +180,15 @@ spec:
extraVolumeMounts:
{{- toYaml .Values.cluster.tezAm.extraVolumeMounts | nindent 6 }}
{{- end }}
{{- if and .Values.cluster.tezAm.autoscaling .Values.cluster.tezAm.autoscaling.enabled }}
autoscaling:
enabled: true
minReplicas: {{ .Values.cluster.tezAm.autoscaling.minReplicas }}
scaleUpStabilizationSeconds: {{ .Values.cluster.tezAm.autoscaling.scaleUpStabilizationSeconds }}
scaleDownStabilizationSeconds: {{ .Values.cluster.tezAm.autoscaling.scaleDownStabilizationSeconds }}
gracePeriodSeconds: {{ .Values.cluster.tezAm.autoscaling.gracePeriodSeconds }}
metricsScrapeIntervalSeconds: {{ .Values.cluster.tezAm.autoscaling.metricsScrapeIntervalSeconds | default 10 }}
{{- end }}
{{- end }}

zookeeper:
Expand Down Expand Up @@ -176,4 +219,15 @@ spec:
volumeMounts:
{{- toYaml .Values.cluster.storage.volumeMounts | nindent 4 }}
{{- end }}

{{- if and .Values.cluster.autoSuspend .Values.cluster.autoSuspend.enabled }}
autoSuspend:
enabled: true
idleTimeoutMinutes: {{ .Values.cluster.autoSuspend.idleTimeoutMinutes | default 15 }}
{{- if hasKey .Values.cluster.autoSuspend "includeMetastore" }}
includeMetastore: {{ .Values.cluster.autoSuspend.includeMetastore }}
{{- end }}
{{- end }}

suspend: false
{{- end }}
62 changes: 62 additions & 0 deletions packaging/src/kubernetes/helm/hive-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ cluster:
# mountPath: /etc/gcs
# readOnly: true

# ---------------------------------------------------------------------------
# AUTO-SUSPEND — fully hibernates the cluster after idle timeout
# ---------------------------------------------------------------------------
# When enabled (requires autoscaling on all active components), the operator
# scales the entire cluster to 0 replicas after all components have been idle
# for idleTimeoutMinutes. Use kubectl patch to manually suspend/wake:
# kubectl patch hivecluster hive --type=merge -p '{"spec":{"suspend":true}}'
# kubectl patch hivecluster hive --type=merge -p '{"spec":{"suspend":false}}'
autoSuspend:
enabled: false
idleTimeoutMinutes: 15
# Set to false to keep HMS running during suspend (HMS autoscaling not required)
includeMetastore: true

# ---------------------------------------------------------------------------
# METASTORE — defaults to enabled, 2 replicas (HA)
# ---------------------------------------------------------------------------
Expand All @@ -112,6 +126,19 @@ cluster:
configOverrides: {}
extraVolumes: []
extraVolumeMounts: []
# Autoscaling (operator-driven, no external dependencies)
# The operator scrapes JMX Exporter metrics from pods directly.
# When enabled, 'replicas' above acts as the max replica ceiling.
autoscaling:
enabled: false
minReplicas: 1
scaleUpThreshold: 100
scaleUpStabilizationSeconds: 60
scaleDownStabilizationSeconds: 300
gracePeriodSeconds: 60
metricsScrapeIntervalSeconds: 10
cpuScaleUpThreshold: 90
cpuScaleDownThreshold: 30
# Set to use an external Metastore instead of deploying one:
# enabled: false
# externalUri: "thrift://external-metastore:9083"
Expand All @@ -127,6 +154,18 @@ cluster:
externalJars: []
extraVolumes: []
extraVolumeMounts: []
# Autoscaling (operator-driven, no external dependencies)
# When enabled, 'replicas' above acts as the max replica ceiling
autoscaling:
enabled: false
minReplicas: 1
scaleUpThreshold: 100
scaleUpStabilizationSeconds: 60
scaleDownStabilizationSeconds: 600
gracePeriodSeconds: 300
metricsScrapeIntervalSeconds: 10
cpuScaleUpThreshold: 90
cpuScaleDownThreshold: 30

# ---------------------------------------------------------------------------
# LLAP — enabled by default for full-HA
Expand All @@ -141,6 +180,17 @@ cluster:
configOverrides: {}
extraVolumes: []
extraVolumeMounts: []
# Autoscaling (operator-driven, no external dependencies)
# minReplicas: 0 enables scale-to-zero — scales up when HS2 has active sessions
# When enabled, 'replicas' above acts as the max replica ceiling
autoscaling:
enabled: false
minReplicas: 0
scaleUpThreshold: 10
scaleUpStabilizationSeconds: 60
scaleDownStabilizationSeconds: 900
gracePeriodSeconds: 600
metricsScrapeIntervalSeconds: 10

# ---------------------------------------------------------------------------
# TEZ AM — enabled by default for full-HA
Expand All @@ -154,3 +204,15 @@ cluster:
configOverrides: {}
extraVolumes: []
extraVolumeMounts: []
# Autoscaling (operator-driven, no external dependencies)
# minReplicas: 0 enables scale-to-zero — wakes when HS2 receives queries
# When enabled, 'replicas' above acts as the max replica ceiling
# TezAM scales demand-based: max(totalSessions, hs2Pods * sessionsPerQueue)
# No scaleUpThreshold needed — scaling is 1:1 with session demand
autoscaling:
enabled: false
minReplicas: 0
scaleUpStabilizationSeconds: 60
scaleDownStabilizationSeconds: 600
gracePeriodSeconds: 120
metricsScrapeIntervalSeconds: 10
8 changes: 7 additions & 1 deletion packaging/src/kubernetes/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,14 @@
<version>${fabric8.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.16</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>${log4j2.version}</version>
</dependency>
<dependency>
Expand Down Expand Up @@ -189,6 +194,7 @@
<executable>docker</executable>
<arguments>
<argument>build</argument>
<argument>--no-cache</argument>
<argument>-t</argument>
<argument>apache/hive:operator-${project.version}</argument>
<argument>.</argument>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
package org.apache.hive.kubernetes.operator;

import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.ResolvedControllerConfiguration;
import org.apache.hive.kubernetes.operator.model.HiveCluster;
import org.apache.hive.kubernetes.operator.reconciler.HiveClusterReconciler;
import org.apache.hive.kubernetes.operator.reconciler.HiveWorkflowSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,7 +40,16 @@ private HiveOperatorMain() {
public static void main(String[] args) {
LOG.info("Starting Hive Kubernetes Operator");
Operator operator = new Operator();
operator.register(new HiveClusterReconciler());
HiveClusterReconciler reconciler = new HiveClusterReconciler();
// Get the annotation-derived base config, then inject our programmatic workflow spec.
ControllerConfiguration<HiveCluster> baseConfig =
operator.getConfigurationService().getConfigurationFor(reconciler);
HiveWorkflowSpec workflowSpec = new HiveWorkflowSpec();
((ResolvedControllerConfiguration<HiveCluster>) baseConfig)
.setWorkflowSpec(workflowSpec);
LOG.info("Registered workflow with {} dependent resource specs",
workflowSpec.getDependentResourceSpecs().size());
operator.register(reconciler, baseConfig);
operator.start();
LOG.info("Hive Kubernetes Operator started successfully");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hive.kubernetes.operator.autoscaling;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Runs periodic metrics scraping in the background so that the JOSDK reconcile
* thread is never blocked by HTTP calls to pod JMX exporters.
* <p>
* Each component gets its own scheduled task that writes results to a shared
* {@link MetricsCache}. The reconciler reads from that cache (non-blocking).
*/
public class BackgroundMetricsScraper {

private static final Logger LOG = LoggerFactory.getLogger(BackgroundMetricsScraper.class);

private final ScheduledExecutorService scheduler;
private final MetricsScraper scraper;
private final MetricsCache cache;
// Key: "namespace/clusterName/component" → active scrape task
private final ConcurrentHashMap<String, ScheduledFuture<?>> activeTasks =
new ConcurrentHashMap<>();
// Tracks registered intervals to detect spec changes
private final ConcurrentHashMap<String, Integer> registeredIntervals =
new ConcurrentHashMap<>();

public BackgroundMetricsScraper(MetricsScraper scraper, MetricsCache cache) {
this.scraper = scraper;
this.cache = cache;
this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "hive-metrics-scraper");
t.setDaemon(true);
return t;
});
}

/**
* Registers (or updates) a periodic scrape task for a component.
* Idempotent — only recreates the task if the interval has changed.
*
* @param namespace the Kubernetes namespace
* @param clusterName the HiveCluster name
* @param component the component name (e.g., "hiveserver2")
* @param selector label selector for pod listing
* @param metricsPort the JMX exporter port
* @param intervalSecs how often to scrape (from AutoscalingSpec)
*/
public void registerOrUpdate(String namespace, String clusterName,
String component, Map<String, String> selector,
int metricsPort, int intervalSecs) {
String key = namespace + "/" + clusterName + "/" + component;
Integer existing = registeredIntervals.get(key);
if (existing != null && existing == intervalSecs) {
return; // Already registered with same interval
}

// Cancel existing task if interval changed
ScheduledFuture<?> oldTask = activeTasks.remove(key);
if (oldTask != null) {
oldTask.cancel(false);
}

ScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(
() -> scrapeAndStore(key, namespace, selector, metricsPort),
0, intervalSecs, TimeUnit.SECONDS);

activeTasks.put(key, future);
registeredIntervals.put(key, intervalSecs);
LOG.debug("Registered background scrape for {} (interval={}s)", key, intervalSecs);
}

/**
* Unregisters all scrape tasks for a deleted cluster.
*/
public void unregisterCluster(String namespace, String clusterName) {
String prefix = namespace + "/" + clusterName + "/";
activeTasks.entrySet().removeIf(entry -> {
if (entry.getKey().startsWith(prefix)) {
entry.getValue().cancel(false);
return true;
}
return false;
});
registeredIntervals.keySet().removeIf(k -> k.startsWith(prefix));
cache.removeByPrefix(prefix);
LOG.debug("Unregistered background scrape tasks for {}/{}", namespace, clusterName);
}

/**
* Shuts down the background scheduler. Called on operator shutdown.
*/
public void shutdown() {
scheduler.shutdownNow();
}

private void scrapeAndStore(String key, String namespace,
Map<String, String> selector, int metricsPort) {
try {
List<PodMetrics> metrics = scraper.scrape(namespace, selector, metricsPort);
cache.put(key, metrics);
} catch (Exception e) {
// Do not update cache on failure — staleness check handles it
LOG.debug("Background scrape failed for {}: {}", key, e.getMessage());
}
}
}
Loading
Loading