Skip to content

Commit f1314f9

Browse files
tongyimingmikatong
andauthored
add ckafka datasource (#1827)
* add ckafka datasource * add changelog --------- Co-authored-by: mikatong <mikatong@tencent.com>
1 parent eeb1396 commit f1314f9

19 files changed

+7114
-0
lines changed

.changelog/1827.txt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
```release-note:new-data-source
2+
tencentcloud_ckafka_datahub_topic
3+
```
4+
5+
```release-note:new-data-source
6+
tencentcloud_ckafka_datahub_group_offsets
7+
```
8+
9+
```release-note:new-data-source
10+
tencentcloud_ckafka_datahub_task
11+
```
12+
13+
```release-note:new-data-source
14+
tencentcloud_ckafka_group
15+
```
16+
17+
```release-note:new-data-source
18+
tencentcloud_ckafka_group_offsets
19+
```
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
Use this data source to query detailed information of ckafka datahub_group_offsets
3+
4+
Example Usage
5+
6+
```hcl
7+
data "tencentcloud_ckafka_datahub_group_offsets" "datahub_group_offsets" {
8+
}
9+
```
10+
*/
11+
package tencentcloud
12+
13+
import (
14+
"context"
15+
16+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
17+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
18+
ckafka "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/ckafka/v20190819"
19+
"github.com/tencentcloudstack/terraform-provider-tencentcloud/tencentcloud/internal/helper"
20+
)
21+
22+
func dataSourceTencentCloudCkafkaDatahubGroupOffsets() *schema.Resource {
23+
return &schema.Resource{
24+
Read: dataSourceTencentCloudCkafkaDatahubGroupOffsetsRead,
25+
Schema: map[string]*schema.Schema{
26+
"name": {
27+
Required: true,
28+
Type: schema.TypeString,
29+
Description: "topic name that the task subscribe.",
30+
},
31+
32+
"group": {
33+
Required: true,
34+
Type: schema.TypeString,
35+
Description: "Kafka consumer group.",
36+
},
37+
38+
"search_word": {
39+
Optional: true,
40+
Type: schema.TypeString,
41+
Description: "fuzzy match topicName.",
42+
},
43+
44+
"topic_list": {
45+
Type: schema.TypeList,
46+
Computed: true,
47+
Description: "The topic array, where each element is a json object.",
48+
Elem: &schema.Resource{
49+
Schema: map[string]*schema.Schema{
50+
"topic": {
51+
Type: schema.TypeString,
52+
Computed: true,
53+
Description: "topic name.",
54+
},
55+
"partitions": {
56+
Type: schema.TypeList,
57+
Computed: true,
58+
Description: "The topic partition array, where each element is a json object.",
59+
Elem: &schema.Resource{
60+
Schema: map[string]*schema.Schema{
61+
"partition": {
62+
Type: schema.TypeInt,
63+
Computed: true,
64+
Description: "topic partitionId.",
65+
},
66+
"offset": {
67+
Type: schema.TypeInt,
68+
Computed: true,
69+
Description: "consumer offset.",
70+
},
71+
"metadata": {
72+
Type: schema.TypeString,
73+
Computed: true,
74+
Description: "Usually an empty string.",
75+
},
76+
"error_code": {
77+
Type: schema.TypeInt,
78+
Computed: true,
79+
Description: "Error Code.",
80+
},
81+
"log_end_offset": {
82+
Type: schema.TypeInt,
83+
Computed: true,
84+
Description: "partition Log End Offset.",
85+
},
86+
"lag": {
87+
Type: schema.TypeInt,
88+
Computed: true,
89+
Description: "The number of unconsumed messages.",
90+
},
91+
},
92+
},
93+
},
94+
},
95+
},
96+
},
97+
98+
"result_output_file": {
99+
Type: schema.TypeString,
100+
Optional: true,
101+
Description: "Used to save results.",
102+
},
103+
},
104+
}
105+
}
106+
107+
func dataSourceTencentCloudCkafkaDatahubGroupOffsetsRead(d *schema.ResourceData, meta interface{}) error {
108+
defer logElapsed("data_source.tencentcloud_ckafka_datahub_group_offsets.read")()
109+
defer inconsistentCheck(d, meta)()
110+
111+
logId := getLogId(contextNil)
112+
113+
ctx := context.WithValue(context.TODO(), logIdKey, logId)
114+
115+
paramMap := make(map[string]interface{})
116+
if v, ok := d.GetOk("name"); ok {
117+
paramMap["name"] = helper.String(v.(string))
118+
}
119+
120+
if v, ok := d.GetOk("group"); ok {
121+
paramMap["group"] = helper.String(v.(string))
122+
}
123+
124+
if v, ok := d.GetOk("search_word"); ok {
125+
paramMap["search_word"] = helper.String(v.(string))
126+
}
127+
128+
service := CkafkaService{client: meta.(*TencentCloudClient).apiV3Conn}
129+
130+
var result []*ckafka.GroupOffsetTopic
131+
132+
err := resource.Retry(readRetryTimeout, func() *resource.RetryError {
133+
groupOffsetTopics, e := service.DescribeCkafkaDatahubGroupOffsetsByFilter(ctx, paramMap)
134+
if e != nil {
135+
return retryError(e)
136+
}
137+
result = groupOffsetTopics
138+
return nil
139+
})
140+
if err != nil {
141+
return err
142+
}
143+
144+
ids := make([]string, 0, len(result))
145+
topicList := make([]map[string]interface{}, 0, len(result))
146+
for _, topic := range result {
147+
topicMap := make(map[string]interface{})
148+
149+
if topic.Topic != nil {
150+
topicMap["topic"] = topic.Topic
151+
ids = append(ids, *topic.Topic)
152+
153+
}
154+
155+
if topic.Partitions != nil {
156+
partitionsList := make([]map[string]interface{}, 0)
157+
for _, partitions := range topic.Partitions {
158+
partitionsMap := map[string]interface{}{}
159+
160+
if partitions.Partition != nil {
161+
partitionsMap["partition"] = partitions.Partition
162+
}
163+
164+
if partitions.Offset != nil {
165+
partitionsMap["offset"] = partitions.Offset
166+
}
167+
168+
if partitions.Metadata != nil {
169+
partitionsMap["metadata"] = partitions.Metadata
170+
}
171+
172+
if partitions.ErrorCode != nil {
173+
partitionsMap["error_code"] = partitions.ErrorCode
174+
}
175+
176+
if partitions.LogEndOffset != nil {
177+
partitionsMap["log_end_offset"] = partitions.LogEndOffset
178+
}
179+
180+
if partitions.Lag != nil {
181+
partitionsMap["lag"] = partitions.Lag
182+
}
183+
184+
partitionsList = append(partitionsList, partitionsMap)
185+
}
186+
187+
topicMap["partitions"] = partitionsList
188+
}
189+
190+
topicList = append(topicList, topicMap)
191+
192+
}
193+
194+
d.SetId(helper.DataResourceIdsHash(ids))
195+
_ = d.Set("topic_list", topicList)
196+
197+
output, ok := d.GetOk("result_output_file")
198+
if ok && output.(string) != "" {
199+
if e := writeToFile(output.(string), topicList); e != nil {
200+
return e
201+
}
202+
}
203+
return nil
204+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package tencentcloud
2+
3+
import (
4+
"testing"
5+
6+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
7+
)
8+
9+
func TestAccTencentCloudCkafkaDatahubGroupOffsetsDataSource_basic(t *testing.T) {
10+
t.Parallel()
11+
resource.Test(t, resource.TestCase{
12+
PreCheck: func() { testAccPreCheckCommon(t, ACCOUNT_TYPE_PREPAY) },
13+
Providers: testAccProviders,
14+
Steps: []resource.TestStep{
15+
{
16+
Config: testAccCkafkaDatahubGroupOffsetsDataSource,
17+
Check: resource.ComposeTestCheckFunc(testAccCheckTencentCloudDataSourceID("data.tencentcloud_ckafka_datahub_group_offsets.datahub_group_offsets")),
18+
},
19+
},
20+
})
21+
}
22+
23+
const testAccCkafkaDatahubGroupOffsetsDataSource = `
24+
25+
data "tencentcloud_ckafka_datahub_group_offsets" "datahub_group_offsets" {
26+
name = "1308726196-keep-topic"
27+
group = "topic-lejrlafu-test"
28+
}
29+
`

0 commit comments

Comments
 (0)