时间:2021-05-20
原理介绍
在Kafka官网中这么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具体的KafkaAdminClient包含了一下几种功能(以Kafka1.0.0版本为准):
其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。主要实现步骤:
客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。
客户端发送请求至Kafka Broker。
Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。
客户端接收相应的回执并进行解析处理。
和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和回执类的两个基本父类。
代码如下
@Componentpublic class KafkaConfig{ // 配置Kafka public Properties getProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; }}@RestControllerpublic class KafkaTopicManager { @Autowired private KafkaConfig kafkaConfig; @GetMapping("createTopic") public void createTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); NewTopic newTopic = new NewTopic("test1",4, (short) 1); Collection<NewTopic> newTopicList = new ArrayList<>(); newTopicList.add(newTopic); adminClient.createTopics(newTopicList); adminClient.close(); } @GetMapping("deleteTopic") public void deleteTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); adminClient.deleteTopics(Arrays.asList("test1")); adminClient.close(); } @GetMapping("listAllTopic") public void listAllTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); ListTopicsResult result = adminClient.listTopics(); KafkaFuture<Set<String>> names = result.names(); try { names.get().forEach((k)->{ System.out.println(k); }); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } adminClient.close(); } @GetMapping("getTopic") public void getTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); DescribeTopicsResult describeTopics = adminClient.describeTopics(Arrays.asList("syn-test")); Collection<KafkaFuture<TopicDescription>> values = describeTopics.values().values(); if(values.isEmpty()){ System.out.println("找不到描述信息"); }else{ for (KafkaFuture<TopicDescription> value : values) { System.out.println(value); } } adminClient.close(); }}以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。
1.NumPy安装使用pip包管理工具进行安装复制代码代码如下:$sudopipinstallnumpy使用pip包管理工具安装ipython(交互式shell
和Windows下各种双击安装直接使用的数据库管理工具不同,Linux下的数据库管理工具显得有些稍稍复杂。由于版权和收费限制,很多好用的数据库管理工具例如Dat
今天,给大家解释的是营销工具中的会员关系管理工具。会员关系管理工具是帮助卖家管理自己会员的工具。通过会员关系管理工具卖家可以充分了解自己会员的信息;针对不同的会
一、什么是鲁班-电商广告管理工具?鲁班-电商广告管理工具是为电商广告主量身定做,用于创建商品页,并进行店铺管理、订单管理、数据信息查询的管理工具。鲁班-电商广告
admin组件使用Django提供了基于web的管理工具。Django自动管理工具是django.contrib的一部分。你可以在项目的settings.py中