Endpoints
Although SQL is a well-established and simple language that can be learned in just a few days, Kubling was originally designed to integrate with other systems, rather than being directly accessed by end users.
However, modern applications rarely interact with databases using plain SQL queries. To address this, we incorporated a mechanism similar to the way virtual databases are defined, allowing Kubling to expose HTTP endpoints for interacting with VDBs, schemas, and entities, offering a more flexible and accessible way to work with data.
These endpoints logic are defined using YAML
files, parsed as templates, and registered to the engine endpoints list during initialization.
YAML
files must be located in the Descriptor Bundle.
Endpoints are divided into two different types: Queries and Actions.
The examples below assume that the functions are part of a module named mymod
.
Queries
Allow to dynamically expose services that return a collection of rows using a SQL query.
Let's break down the definition of an endpoint using the following sample:
{% set vdb = "k8s" %}
{% set tables = mymod__getTablesByTags(vdb=_context.vdb, schema="all", tags="kubernetes;deployment") %}
---
type: sqlDirect
virtualDatabase: {{ vdb }}
neededFields:
- namespace
query: |
{% set first = true %}
{% for table in tables %}
{{ "UNION ALL" | filterValWhenVarFalse(var="first", flip=true) }}
SELECT * FROM {{ table }} WHERE metadata__namespace = '{{ namespace }}'
{% endfor %}
ORDER BY clusterName
This endpoint returns Kubernetes Deployments of all clusters defined as Data Source, running in a specified namespace.
The first line just sets a value to the vdb
variable. Although it is not really useful here since the value does not change, it allows add custom logic when
having multiple Virtual Databases.
Second line is really interesting, since we get the list of TABLE
names, of all SCHEMAS
whose tags contain kubernetes
and deployment
.
sqlDirect
type (the only one supported by now), tells the engine that the query
field contains a valid Kubling SQL query.
In the line 8
, we define the input parameter fields needed by this endpoint, only one in this case. When defined, the engine expects the request sent by the client containing
a body document, in JSON
or YAML
format, with values for these fields.
When the template is parsed, these fields are injected in the template context.
What we want to have as a resulting query, is just a UNION
query that aggregates results from multiple TABLES
. To do so, we use a
loop to iterate over tables.
The UNION ALL
keyword is placed between two SELECT
expressions, therefore it must be omitted when printing the first SELECT
. To achieve that, we use a
very simple filter function called filterValWhenVarFalse
, that returns null
when the context variable passed as var
param is false
and the actual filtered
value when it is true
. When flip
param is true
, the boolean
value is flipped each time the filter is called.
More information about filters here.
Calling a Query Endpoint
The URL for calling a query endpoint is http[s]://[address]:8282/api/v1/query/perform/[query_name]
where query_name
is the name of the YAML
file (without the extension).
Example: POST
http://localhost:8282/api/v1/query/perform/get_all_deployments
with body as follows:
{ "namespace": "08abb0fc-f7af-4fe8-98d4-e76729567dc8" }
Actions
The purpose of actions is to define a sequence of operations that are performed across other systems, typically represented as entities within a VDB. In essence, an action represents the required operations to achieve a desired upstream state, using Kubling SQL queries.
The concept of the desired state is crucial, as Kubling's primary goal is to apply a data-centric approach to operations, ensuring consistency across various operational platforms and systems. However, controlling the internal state of upstream systems often relies on APIs that may not be transactional. For example, in Kubernetes, when a new resource descriptor is applied (create or update), the API only provides a promise upon receipt, not a confirmation that the resource has been fully created and is operational.
In this context, an entity defined in Kubling must remain consistent. When an INSERT
, UPDATE
, or DELETE
operation is performed on an entity,
Kubling will execute the corresponding actions on the dependent system. Assuming the upstream system will eventually fulfill the request,
an INSERT
on the entity will return the current upstream state, maintaining the necessary consistency.
Let's use the following sample:
# ********* SECTION 1 *********
{% set vdb = "App" %}
{% set componentName = mymod__get_k8s_component_info(_context.vdb, _context.component_id) %}
{% set clusterSchema = mymod__find_best_cluster_for_deploy(_context.vdb, _context.component_id, _context.env) %}
{% set selectorUUID = uuid() %}
---
# ********* SECTION 2 *********
needed:
fields:
- component_id
- env
- containers
before:
- name: "Component must exist"
virtualDatabase: {{ vdb }}
entity: "app_db.COMPONENT"
filters:
- field: "ID"
value: {{ component_id }}
operation: EQUAL
assertThat: exists
message: "Component {{ component_id }} does no exist."
- name: "Deployment does not yet exist"
virtualDatabase: {{ vdb }}
entity: "app_db.COMPONENT_DEPLOYMENT_JOIN"
filters:
- field: "component_id"
value: {{ component_id }}
operation: EQUAL
- field: "environment"
value: {{ env }}
operation: "EQUAL"
assertThat: does_not_exist
message: "Deployment {{ componentName }} of Application {{ app_id }} environment {{ env }} already exists."
# ********* SECTION 3 *********
operations:
- name: "namespace_insert"
type: "insert"
virtualDatabase: {{ vdb }}
entity: "{{ clusterSchema }}.NAMESPACE"
skipWhenTrue: {{ mymod__ns_exists(_context.vdb, _context.clusterSchema, _context.component_id) }}
valueAssignments:
- field: "metadata__name"
value: "{{ component_id }}"
waitUntilEffective:
maxSeconds: 15
checkEverySeconds: 5
byGetting:
fields:
- "metadata__name"
usingFilterFieldValuePairs:
metadata__name: "{{ component_id }}"
status__phase: "Active"
- name: "deployment_insert"
type: "insert"
virtualDatabase: {{ vdb }}
entity: "{{ clusterSchema }}.DEPLOYMENT"
valueAssignments:
- field: "identifier"
value: "xxx"
- field: "metadata__namespace"
value: "{{ component_id }}"
- field: "metadata__name"
value: "{{ componentName }}"
- field: "spec__template__metadata__labels"
dataType: "json"
value:
mgmt.kubling.com/managed: "appmodel"
mgmt.kubling.com/uid: {{ selectorUUID }}
- field: "spec__selector__matchLabels"
dataType: "json"
value:
mgmt.kubling.com/managed: "appmodel"
mgmt.kubling.com/uid: {{ selectorUUID }}
- field: "spec__template__spec__containers"
dataType: "json"
value:
{% for container in fromYamlArrayToIterable(containers) %}
- name: {{ container.name }}
image: {{ container.image }}
imagePullPolicy: IfNotPresent
{% for envEntry in fromYamlArrayToIterable(container.env) %}
env:
- name: {{ envEntry.name }}
value: {{ envEntry.value }}
{% endfor %}
resources:
requests:
cpu: {{ container.cpuRequest }}
memory: {{ container.memRequest }}
{% for port in fromYamlArrayToIterable(container.ports) %}
ports:
- containerPort: {{ port.portNumber }}
{% endfor %}
{% endfor %}
waitUntilEffective:
maxSeconds: 60
checkEverySeconds: 5
byGetting:
fields:
- "identifier"
usingFilterFields:
- "metadata__namespace"
- "metadata__name"
putInContextVar: newAppliedDeployment
- name: "join_insert"
type: "insert"
virtualDatabase: {{ vdb }}
entity: "app_db.COMPONENT_DEPLOYMENT_JOIN"
valueAssignments:
- field: "component_id"
value: "{{ component_id }}"
- field: "environment"
value: "{{ env }}"
- field: "deployment_identifier"
value: "{{ deferValueProcessing(contextVar='newAppliedDeployment.identifier') }}"
To simplify understanding, we can break the action down into three sections, as noted in the code with comments.
Section 1 (Header)
This section contains the initial variables required in the subsequent sections.
mymod__get_k8s_component_info
and mymod__find_best_cluster_for_deploy
are user-defined template functions.
The function mymod__find_best_cluster_for_deploy
is particularly important because it returns the name of the optimal cluster schema
where a component should be deployed, based on the rules defined within the function itself.
Section 2 (Prerequisites)
In this section, the engine checks preconditions before performing any operations.
The needed.fields
attribute operates similarly to query.neededFields
, meaning the request body must contain a JSON
object with all the necessary fields.
Additionally, this section includes a before
element, which defines a list of assertions. If any assertion fails, the action will return an error.
For example, the assertion Component must exist
ensures that the component being deployed is present in the application database, thus preserving consistency.
Behind the scenes, this assertion is translated into a Kubling SQL query that verifies the existence of the relevant records, ensuring that the deployment
targets valid entities.
Section 3 (Operations)
In this particular section, let's review operation by operation.
namespace_insert
This operation creates a new namespace in the selected Kubernetes cluster.
Although the operation is idempotent, the user-defined template function mymod__ns_exists
, which returns a boolean
indicating whether the namespace already exists,
is used to determine if the action should be skipped via the skipWhenTrue
attribute.
The valueAssignments
section specifies the list of fields and their respective values, which is translated into a standard SQL INSERT
statement in the format:
INSERT INTO table_name (column1, column2, column3, ...) VALUES (value1, value2, value3, ...)
.
Finally, the waitUntilEffective
attribute plays a crucial role. Since actions form a chain of interdependent steps (in essence, a path-like DAG),
some nodes may rely on values computed by previous nodes or just general states. waitUntilEffective
ensures that such dependencies are resolved by waiting until
the previous operations are fully effective before proceeding.
The usingFilterFieldValuePairs
attribute specifies the WHERE
clause used to verify the effectiveness of prior operations.
It lists the conditions in a key-value format (field name: value), ensuring the system can check the relevant data before proceeding.
deployment_insert
This operation contains, under the valueAssignments
, a field named spec__template__metadata__labels
, whose dataType
is a JSON
object.
As you can see, it is possible to pass full objects, which makes the operation easier when dealing with documents.
spec__template__spec__containers
is also a JSON
object but a bit more flexible. In this case, its value is built by iterating over the parameter containers
which
is expected to be an object like:
"containers": [
{
"name": "my-container",
"image": "nginx",
"cpuRequest": "1m",
"memRequest": "250m",
"env": [
{
"name": "env1",
"value": "val1"
}
],
"ports": [
{
"portNumber": 8282
}
]
}
]
Also, in this case waitUntilEffective
puts the result of identifier
in a template context variable called newAppliedDeployment
.
join_insert
When the template parser reads the file, it resolves all blocks within delimiters at once.
If a particular variable referenced is not present in the context, the parser returns null
This operation needs the value of newAppliedDeployment
created in the previous step, therefore we need to explicitly tell the parser that
the processing must be deferred, using a function called deferValueProcessing
. That function promises the parser that at some point of the execution
the context will contain the newAppliedDeployment
variable whose value must be assigned to deployment_identifier
.
Calling an Action Endpoint
The URL for calling a query endpoint is:
POST
http[s]://[address]:8282/api/v1/actions/run/[action_name]
Where action_name
is the name of the YAML
file (without the extension).
Dealing with errors
Each operation composing an Action is atomic.
It is sometimes challenging since an Action with several operations might fail at some point, which makes the operation to stop and return an error.
To help with that, an operation can contain a sub-operation which is triggered in case of failure, as follows:
needed:
fields:
- name
- env
- containers
before:
...
operations:
- name: "namespace_insert"
type: "insert"
virtualDatabase: {{ vdb }}
entity: "{{ clusterSchema }}.NAMESPACE"
skipWhenTrue: {{ mymod__ns_exists(_context.vdb, _context.clusterSchema, _context.component_id) }}
valueAssignments:
- field: "metadata__name"
value: "{{ component_id }}"
...
- name: "pv_insert"
type: "insert"
virtualDatabase: {{ vdb }}
entity: "{{ clusterSchema }}.PERSISTENT_VOLUME"
...
rollback:
type: "delete"
filters:
...
- name: "deployment_insert"
type: "insert"
virtualDatabase: {{ vdb }}
entity: "{{ clusterSchema }}.DEPLOYMENT"
...
rollback:
type: "delete"
filters:
...
Operations deployment_insert
and pv_insert
declare a rollback delete
operation that deletes the entry inserted by the operation, meaning that a Persistent Volume is not
kept in the cluster if we can't create a Deployment.
In general, when an error occurs, a new path-like DAG with a reverse direction is generated, as shown in the following diagram:
In this context, rollback
is a generic term we've assigned to the sub-operation to prevent any misunderstanding;
however, it does not correspond to a traditional database rollback. The more accurate term to describe Kubling's internal mechanism is "compensation".
This distinction is important as compensation entails applying corrective actions to achieve the desired state rather than reverting to a previous state or preventing
change merging, as a rollback would.
This document (opens in a new tab) might help in understanding the concept.
More information about Transactions in Kubling.