Hunting for and validating SSL certs with our first Argo Workflow.

Chris Haessig
7 min readJan 2, 2024

For years I have been managing kubernetes clusters. Running k8s for a long period of time, certificates are always an interesting challenge.

There are a lot of different ways to manage certificates, with hashicorps vault and certificate manager being my favorite. Check out my older post on this.

Sometimes you inherit legacy clusters that don’t have the newer tools. This usually leads to certificates that expire breaking things in the process. In this post we will walk through a simple Argo Workflow setup. We will create our first Workflow which will locate certificates and check when they will expire.

You can learn more about workflows here.

Workflow setup.

I usually prefer to run multiple kubernetes clusters this day and age. This means we will install Argo Workflows on our Argo specific k8s clusters. We can configure Workflows to communicate with other k8s clusters.

Starting on the argo cluster. We install Argo Workflows

kubectl create namespace argo
kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v<<ARGO_WORKFLOWS_VERSION>>/install.yaml

kubectl patch deployment \
argo-server \
--namespace argo \
--type='json' \
-p='[{"op": "replace", "path": "/spec/template/spec/containers/0/args", "value": [
"server",
"--auth-mode=server"
]}]'

Once all the pods start, we should be able to to port forward to the UI.

kubectl -n argo port-forward deployment/argo-server 2746:2746

Next thing we want to do is get a bearer token and API endpoint of the cluster Workflows will connect with, to do the cert validation.

Getting permissions

With kubernetes, if you have network access, an api endpoint and bearer token, you have everything you should need to connect to the cluster and make api calls.

To generate a token, we connect to the destination cluster , and run the create token command, which will return the bearer token we need.

# k8s clusters we are scanning for,  not argo cluster 

kubectl create token argotest -n default

We can test this by using curl and the token with the Authorization header.

TOKEN=<output above>
API=https://fake.elb.amazonaws.com # endpoint of the k8s cluster
curl --header "Authorization: Bearer $TOKEN" -k $API

Returning info, we can confirmed it worked.

Back to Workflows

So when we execute a Workflow, it will launch a pod on the k8s cluster that we installed Argo Workflows on. Workflows will run a python script that we will soon create to connect to the destination cluster and perform our scan looking for expired certificates.

What we need to do is create a Workflow CRD .

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: cert-validator-
spec:
entrypoint: k8s-script
templates:
- name: k8s-script
inputs:
artifacts:
- name: python-script
path: /tmp/script.py
raw:
data: |
< Python script here >
container:
image: python:3.11
command:
- "/bin/sh"
- "-c"
- |
pip install kubernetes cryptography && python /tmp/script.py
env:
- name: token
valueFrom:
secretKeyRef:
name: workpass
key: token
- name: k8surl
valueFrom:
secretKeyRef:
name: workpass
key: k8surl
- name: search_days
value: "500"

Above is what a Workflow looks like. This is WITHOUT the python script we will soon create.

A lot of self explanatory things, like we give it a name “cert-validator-”. We want the pod to start with the python:3.11 container and execute the python script we will create. We also define some environment variables the pod will need to connect to the remote cluster.

This is running on a separate argo cluster, so we need to give it the bearer token , api endpoint and the search-days params to trigger when a cert is about to expire.

The Workflow above grabs those out of a k8s secret we need to create.

kubectl create secret generic workpass --from-literal=token=$TOKEN --from-literal=k8surl=$APIENDPOINT -n argo

Building our python script.

We want to create a python script to connect to our destination kubernetes cluster.

                    configuration = client.Configuration()
configuration.host = os.environ["k8surl"]
configuration.verify_ssl = False

configuration.api_key = {
"authorization": "Bearer {}".format(os.environ["token"])
}

# Create an API client with the configured settings
api_instance = client.CoreV1Api(client.ApiClient(configuration))

Then iterate through all the secrets on the cluster . We assume it’s a certificate if it has the word .crt or .pem in the filename.

                   # Search all namespaces
namespaces = api_instance.list_namespace().items

# Loop through all namespaces

for namespace in namespaces:

secrets = api_instance.list_namespaced_secret(namespace.metadata.name).items

# Iterate through each secret in the namespace
for secret in secrets:

if secret.data:
for key, value in secret.data.items():
# Only validate certs that have the crt or pem name in the name.
if '.pem' in key or '.crt' in key:
raw_cert = base64.b64decode(value).decode("utf-8")

# Send to the check_cert function to see how long its valid for
check_cert(key, raw_cert, secret.metadata.namespace)

Passing each certificate we found to the check_cert function, we process the certificate using the python cryptograph library, which returns the remaining days the cert is validate for.

               
def check_cert(certname, rawcert, namespace):

current_date = datetime.now()

# Alert on any certs that will expire in 30 days or less
expire_soon = int(os.environ["search_days"])

try:
results = x509.load_pem_x509_certificate(rawcert.encode(), default_backend())
expiration_date = results.not_valid_after

# Calculate the remaining days until expiration
remaining_days = (expiration_date - current_date).days

except:
# Ignore errors, probally a bad idea , but what else can we really do ?
return

# Under 1 day or -1 the cert is expired
if remaining_days < 1:
print(certname, " is expired from namespace, ", namespace)

elif remaining_days < expire_soon:
print(certname, " will expire in ", remaining_days, " days in namespace ",namespace )

# main function to search for certs in the secrets API

Here is the entire Workflow

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: cert-validator-
spec:
entrypoint: k8s-script
templates:
- name: k8s-script
inputs:
artifacts:
- name: python-script
path: /tmp/script.py
raw:
data: |
from kubernetes import client, config
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from datetime import datetime
import os
import base64

# Added this block, to ignore SSL warnings

import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

# check_cert function to validate cert

def check_cert(certname, rawcert, namespace):
current_date = datetime.now()

# Alert on any certs that will expire in 30 days or less
expire_soon = int(os.environ["search_days"])

try:
results = x509.load_pem_x509_certificate(rawcert.encode(), default_backend())
expiration_date = results.not_valid_after

# Calculate the remaining days until expiration
remaining_days = (expiration_date - current_date).days

except:
# Ignore errors, probally a bad idea , but what else can we really do ?
return

# Under 1 day or -1 the cert is expired
if remaining_days < 1:
print(certname, " is expired from namespace, ", namespace)

elif remaining_days < expire_soon:
print(certname, " will expire in ", remaining_days, " days in namespace ",namespace )

# main function to search for certs in the secrets API

def main():
configuration = client.Configuration()
configuration.host = os.environ["k8surl"]
configuration.verify_ssl = False

configuration.api_key = {
"authorization": "Bearer {}".format(os.environ["token"])
}

# Create an API client with the configured settings
api_instance = client.CoreV1Api(client.ApiClient(configuration))

# Search all namespaces
namespaces = api_instance.list_namespace().items

# Loop through all namespaces

for namespace in namespaces:

secrets = api_instance.list_namespaced_secret(namespace.metadata.name).items

# Iterate through each secret in the namespace
for secret in secrets:

if secret.data:
for key, value in secret.data.items():

# Only validate certs that have the crt or pem name in the name.
if '.pem' in key or '.crt' in key:
raw_cert = base64.b64decode(value).decode("utf-8")

# Send to the check_cert function to see how long its valid for
check_cert(key, raw_cert, secret.metadata.namespace)

if __name__ == "__main__":
main()
container:
image: python:3.11
command:
- "/bin/sh"
- "-c"
- |
pip install kubernetes cryptography && python /tmp/script.py
env:
- name: token
valueFrom:
secretKeyRef:
name: workpass
key: token
- name: k8surl
valueFrom:
secretKeyRef:
name: workpass
key: k8surl
- name: search_days
value: "350"

Running on Argo Workflows.

Download the argo cli

brew install argo

Then use the argo submit command , we pass the above manifest. This will create a pod on our Argo Workflows cluster and execute our defined python script.

argo submit -n argo --watch workflow.yml

Name: cert-validator-42mjk
Namespace: argo
ServiceAccount: unset (will run with the default ServiceAccount)
Status: Running
Created: Tue Jan 02 02:27:07 -0800 (1 second ago)
Started: Tue Jan 02 02:27:07 -0800 (1 second ago)
Duration: 1 second
Progress: 0/1

STEP TEMPLATE PODNAME DURATION MESSAGE
◷ cert-validator-42mjk k8s-script cert-validator-42mjk 1s
kubectl get pods -n argo                               READY   STATUS      RESTARTS   AGE
argo-server-76fcd9c66f-fq7gb 1/1 Running 0 3d8h
cert-validator-42mjk 0/2 Running 0 39s

We can also watch the job via the Argo Workflow UI as well.

Checking out the logs, we can see our python script executed successfully , connected to our destination cluster and found all the certs that will expire in the next 350 days or less.

cert-validator-tdz7t: [notice] A new release of pip is available: 23.2.1 -> 23.3.2
cert-validator-tdz7t: [notice] To update, run: pip install --upgrade pip
cert-validator-tdz7t: ca.crt will expire in 355 days in namespace cert-manager
cert-validator-tdz7t: tls.crt will expire in 312 days in namespace cert-manager
cert-validator-tdz7t: tls.pem will expire in 210 days in namespace testing
cert-validator-tdz7t: ca.pem will expire in 355 days in namespace testing
cert-validator-tdz7t: ca.pem will expire in 300 days in namespace testing
cert-validator-tdz7t: server.pem will expire in 289 days in namespace testing
cert-validator-tdz7t: time="2024-01-02T10:31:20.852Z" level=info msg="sub-process exited" ar
go=true error="<nil>"

There is a lot more to Argo Workflows , but we have sucessfully created our first job.

Profit ?

--

--