Skip to main content

卡夫卡

通过我们的 Kafka 集成,您可以根据您的映射和定义,将 Kafka "集群 "中的 "经纪人 "和 "专题 "导入 Port。

常见被用于情况

  • 映射 Kafka 集群中的代理和主题。
  • 按计划关注对象变更(创建/更新/删除),并自动将变更应用到 Port 中的实体。
  • 使用自助操作创建/删除 Kafka 对象。

先决条件

To install the integration, you need a Kubernetes cluster that the integration's container chart will be deployed to.

Please make sure that you have kubectl and helm installed on your machine, and that your kubectl CLI is connected to the Kubernetes cluster where you plan to install the integration.

安装

从以下安装方法中选择一种:

使用该安装选项意味着集成将能使用 webhook 实时更新 Port。

本表总结了安装时可用的参数,请在下面的脚本中按自己的需要进行设置,然后复制并在终端运行:

ParameterDescriptionExampleRequired
port.clientIdYour port client id
port.clientSecretYour port client secret
integration.secrets.clusterConfMappingThe Mapping of Kafka cluster names to Kafka client config

Advanced configuration

ParameterDescription
integration.eventListener.typeThe event listener type. Read more about event listeners
integration.typeThe integration to be installed
scheduledResyncIntervalThe number of minutes between each resync. When not set the integration will resync for each event listener resync event. Read more about scheduledResyncInterval
initializePortResourcesDefault true, When set to true the integration will create default blueprints and the port App config Mapping. Read more about initializePortResources

To install the integration using Helm, run the following command:

helm repo add --force-update port-labs https://port-labs.github.io/helm-charts
helm upgrade --install kafka port-labs/port-ocean \
--set port.clientId="PORT_CLIENT_ID" \
--set port.clientSecret="PORT_CLIENT_SECRET" \
--set port.baseUrl="https://api.getport.io" \
--set initializePortResources=true \
--set scheduledResyncInterval=60 \
--set integration.identifier="my-kafka-integration" \
--set integration.type="kafka" \
--set integration.eventListener.type="POLLING" \
--set-json integration.secrets.clusterConfMapping='{"local": {"bootstrap.servers": "localhost:9092"}}'
高级集成配置

有关代理或自签名证书等高级配置,click here

接收 Kafka 对象

Kafka 集成使用 YAML 配置来描述将数据加载到开发者门户的过程。

下面是配置中的一个示例片段,演示了从 Kafka 获取 集群数据的过程:

createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: cluster
selector:
query: "true"
port:
entity:
mappings:
identifier: .name
title: .name
blueprint: '"kafkaCluster"'
properties:
controllerId: .controller_id

该集成利用JQ JSON processor 对 Kafka 元数据对象中的现有字段和值进行选择、修改、连接、转换和其他操作。

配置结构

集成配置决定了将从 Kafka 查询哪些资源,以及将在 Port 中创建哪些实体和属性。

支持的资源 以下资源可被用来映射来自 Kafka 的数据,可以引用下面示例中出现的任何字段进行映射配置。
Cluster example
{
"name": "local",
"controller_id": "1"
}
Broker example
{
"id": "1",
"address": "localhost:9092/1",
"cluster_name": "local",
"config": {"key": "value", ...}
}
Topic example
{
"name": "_consumer_offsets",
"cluster_name": "local",
"partitions": [
{
"id": 0,
"leader": 2,
"replicas": [2, 1, 3],
"isrs": [3, 2, 1]
}
],
"config": {"key": "value", ...}
}
  • 集成配置的根密钥是 "资源 "密钥:

    resources:
    - kind: cluster
    selector:
    ...
  • 类型 "键是 Kafka 对象的说明符:

      resources:
    - kind: cluster
    selector:
    ...
  • 通过 "选择器 "和 "查询 "键,您可以过滤哪些指定 "类型 "的对象将被录入软件目录:

    resources:
    - kind: cluster
    selector:
    query: "true" # JQ boolean expression. If evaluated to false - this object will be skipped.
    port:
  • Port"、"实体 "和 "映射 "键被用来将 Kafka 对象字段映射到Port实体。要创建多个同类映射,可以在 resources 数组中添加另一项;

    resources:
    - kind: cluster
    selector:
    query: "true"
    port:
    entity:
    mappings: # Mappings between one Kafka cluster to a Port entity. Each value is a JQ query.
    identifier: .name
    title: .name
    blueprint: '"kafkaCluster"'
    properties:
    controllerId: .controller_id
    - kind: cluster # In this instance cluster is mapped again with a different filter
    selector:
    query: '.name == "MyClusterName"'
    port:
    entity:
    mappings: ...
蓝图键 注意 blueprint 键的值 - 如果要使用硬编码字符串,需要用 2 组引号封装,例如使用一对单引号 ('),然后再用一对双引号 ("): :::

将数据输入Port

要使用integration configuration 被用于 Kafka 对象,可以按照以下步骤操作:

  1. 转到 DevPortal Builder 页面。
  2. 选择要使用 Kafka 进行引用的蓝图。
  3. 从菜单中选择摄取数据选项。
  4. 在事件处理 Provider 类别下选择 Kafka。
  5. 根据需要修改configuration
  6. 单击 Resync

示例

蓝图和相关集成配置示例:

集群

Cluster blueprint
{
"identifier": "kafkaCluster",
"title": "Cluster",
"icon": "Kafka",
"schema": {
"properties": {
"controllerId": {
"title": "Controller ID",
"type": "string"
}
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: cluster
selector:
query: "true"
port:
entity:
mappings:
identifier: .name
title: .name
blueprint: '"kafkaCluster"'
properties:
controllerId: .controller_id

经纪人

Broker blueprint
{
"identifier": "kafkaBroker",
"title": "Broker",
"icon": "Kafka",
"schema": {
"properties": {
"address": {
"title": "Address",
"type": "string"
},
"region": {
"title": "Region",
"type": "string"
},
"version": {
"title": "Version",
"type": "string"
},
"config": {
"title": "Config",
"type": "object"
}
}
},
"relations": {
"cluster": {
"target": "kafkaCluster",
"required": true,
"many": false
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: broker
selector:
query: "true"
port:
entity:
mappings:
identifier: .cluster_name + "_" + (.id | tostring)
title: .cluster_name + " " + (.id | tostring)
blueprint: '"kafkaBroker"'
properties:
address: .address
region: .config."broker.rack"
version: .config."inter.broker.protocol.version"
config: .config
relations:
cluster: .cluster_name

主题

Topic blueprint
{
"identifier": "kafkaTopic",
"title": "Topic",
"icon": "Kafka",
"schema": {
"properties": {
"replicas": {
"title": "Replicas",
"type": "number"
},
"partitions": {
"title": "Partitions",
"type": "number"
},
"compaction": {
"title": "Compaction",
"type": "boolean"
},
"retention": {
"title": "Retention",
"type": "boolean"
},
"deleteRetentionTime": {
"title": "Delete Retention Time",
"type": "number"
},
"partitionsMetadata": {
"title": "Partitions Metadata",
"type": "array"
},
"config": {
"title": "Config",
"type": "object"
}
}
},
"relations": {
"cluster": {
"target": "kafkaCluster",
"required": true,
"many": false
},
"brokers": {
"target": "kafkaBroker",
"required": false,
"many": true
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: topic
selector:
query: "true"
port:
entity:
mappings:
identifier: .cluster_name + "_" + .name
title: .cluster_name + " " + .name
blueprint: '"kafkaTopic"'
properties:
replicas: .partitions[0].replicas | length
partitions: .partitions | length
compaction: .config."cleanup.policy" | contains("compact")
retention: .config."cleanup.policy" | contains("delete")
deleteRetentionTime: .config."delete.retention.ms"
partitionsMetadata: .partitions
config: .config
relations:
cluster: .cluster_name
brokers: '[.cluster_name + "_" + (.partitions[].replicas[] | tostring)] | unique'