时间:2021-05-26
本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类.
以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php
<?phpnamespace App\Tools; use Illuminate\Config\Repository; use Illuminate\Support\Facades\DB;use Monolog\Logger;use Monolog\Handler\StreamHandler; use Illuminate\Http\Request; class Kafka{ public $broker_list = '127.0.0.1';//配置kafka,可以用逗号隔开多个kafka public $topic = 'test';//管道名称 public $partition = 0; protected $producer = null; protected $consumer = null; public function __construct() { if (empty($this->broker_list)) { throw new InvalidConfigException("broker not config"); } $rk = new \RdKafka\Producer(); if (empty($rk)) { throw new InvalidConfigException("producer error"); } $rk->setLogLevel(LOG_DEBUG); if (!$rk->addBrokers($this->broker_list)) { throw new InvalidConfigException("producer error"); } $this->producer = $rk; } /** * 生产者 * @param array $messages * @return mixed */ public function send($messages = [],$topic) { $topic = $this->producer->newTopic($topic); return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages)); } /** * 消费者 */ public function consumer($object, $callback){ $conf = new \RdKafka\Conf(); $conf->set('group.id', 0); $conf->set('metadata.broker.list', $this->broker_list); $topicConf = new \RdKafka\TopicConf(); $topicConf->set('auto.offset.reset', 'smallest'); $conf->setDefaultTopicConf($topicConf); $consumer = new \RdKafka\KafkaConsumer($conf); $consumer->subscribe([$this->topic]); echo "waiting for messages.....\n"; while(true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: echo "message payload...."; $object->$callback($message->payload); break; } sleep(1); } }}?>在控制器中如何使用:
首先再头部导入这个类:use App\Tools\Kafka;
下面是使用生产者实例:
public function test(){ $topic = 'tool';//输入使用管道名称 $data['shop_id'] = 58; $data['bar_code']=586; $data['goods_num'] = 1; $data['goods_unit'] = '个'; $Kafka = new Kafka();$Error_Msg = $Kafka->send($data,$topic);//传入数组会自动转换jsonvar_dump($Error_Msg); }下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:
<?php $conf = new RdKafka\Conf(); $conf->set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf);$rk->addBrokers("localhost:9092"); $topicConf = new RdKafka\TopicConf();$topicConf->set('auto.commit.interval.ms', 100);$topicConf->set('offset.store.method', 'file');$topicConf->set('offset.store.path', sys_get_temp_dir());$topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic("tool", $topicConf);//读取的管道 // Start consuming partition 0$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume(0, 120*10000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: //没有错误打印信息 $message = json_decode(json_encode($message),true); $data = json_decode($message['payload'],true); var_dump($data); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "等待接收信息\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "超时\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } sleep(1);} ?>到此这篇关于Laravel中Kafka的使用详解的文章就介绍到这了,更多相关Laravel中Kafka内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。
springboot对kafka的client很好的实现了集成,使用非常方便,本文也实现了一个在springboot中实现操作kafka的demo。1.POM配
这里使用spring-kafka依赖和KafkaTemplate对象来操作Kafka服务。一、添加依赖和添加配置项1.1、在Pom文件中添加依赖org.spri
在一次项目中,因甲方需要使用kafka消息队列推送数据,所以需要接入kafka,并且kafka的版本是2.11。但是我们项目使用的是Springboot1.5.
Kafka的结构与RabbitMQ类似,消息生产者向Kafka服务器发送消息,Kafka接收消息后,再投递给消费者。生产者的消费会被发送到Topic中,Topi
Kafka(http://kafka.apache.org/)是由LinkedIn使用Scala编写的一个分布式消息系统,用作LinkedIn的活动流(Acti