时间:2021-05-22
这里以将Apache的日志写入到ElasticSearch为例,来演示一下如何使用Python将Spark数据导入到ES中。
实际工作中,由于数据与使用框架或技术的复杂性,数据的写入变得比较复杂,在这里我们简单演示一下。
如果使用Scala或Java的话,Spark提供自带了支持写入ES的支持库,但Python不支持。所以首先你需要去这里下载依赖的ES官方开发的依赖包包。
下载完成后,放在本地目录,以下面命令方式启动pyspark:
pyspark --jars elasticsearch-hadoop-6.4.1.jar
如果你想pyspark使用Python3,请设置环境变量:
export PYSPARK_PYTHON=/usr/bin/python3
理解如何写入ES的关键是要明白,ES是一个JSON格式的数据库,它有一个必须的要求。数据格式必须采用以下格式
{ "id: { the rest of your json}}
往下会展示如何转换成这种格式。
解析Apache日志文件
我们将Apache的日志文件读入,构建Spark RDD。然后我们写一个parse()函数用正则表达式处理每条日志,提取我们需要的字
rdd = sc.textFile("/home/ubuntu/walker/apache_logs")
regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'
换句话说,我们刚开始从日志文件读入RDD的数据类似如下:
['83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"']
然后我们使用map函数转换每条记录:
rdd2 = rdd.map(parse)
rdd2.take(1)
[{'date': '17/May/2015:10:05:03 +0000', 'ip': '83.149.9.216', 'operation': 'GET', 'uri': '/presentations/logstash-monitorama-2013/images/kibana-search.png'}]
现在看起来像JSON,但并不是JSON字符串,我们需要使用json.dumps将dict对象转换。
我们同时增加一个doc_id字段作为整个JSON的ID。在配置ES中我们增加如下配置“es.mapping.id”: “doc_id”告诉ES我们将这个字段作为ID。
这里我们使用SHA算法,将这个JSON字符串作为参数,得到一个唯一ID。
计算结果类似如下,可以看到ID是一个很长的SHA数值。
rdd3.take(1)
[('a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c', '{"date": "17/May/2015:10:05:03 +0000", "ip": "83.149.9.216", "operation": "GET", "doc_id": "a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c", "uri": "/presentations/logstash-monitorama-2013/images/kibana-search.png"}')]
现在我们需要制定ES配置,比较重要的两项是:
其他的配置自己去探索。
然后我们使用saveAsNewAPIHadoopFile()将RDD写入到ES。这部分代码对于所有的ES都是一样的,比较固定,不需要理解每一个细节
es_write_conf = { "es.nodes" : "localhost", "es.port" : "9200", "es.resource" : 'walker/apache', "es.input.json": "yes", "es.mapping.id": "doc_id" } rdd3.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_write_conf)rdd3 = rdd2.map(addID)def addId(data): j=json.dumps(data).encode('ascii', 'ignore') data['doc_id'] = hashlib.sha224(j).hexdigest() return (data['doc_id'], json.dumps(data))最后我们可以使用curl进行查询
curl http://localhost:9200s/walker/apache/_search?pretty=true&?q=*{ "_index" : "walker", "_type" : "apache", "_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2", "_score" : 1.0, "_source" : { "date" : "17/May/2015:10:05:32 +0000", "ip" : "91.177.205.119", "operation" : "GET", "doc_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2", "uri" : "/favicon.ico" }如下是所有代码:
import jsonimport hashlibimport redef addId(data): j=json.dumps(data).encode('ascii', 'ignore') data['doc_id'] = hashlib.sha224(j).hexdigest() return (data['doc_id'], json.dumps(data))def parse(str): s=p.match(str) d = {} d['ip']=s.group(1) d['date']=s.group(4) d['operation']=s.group(5) d['uri']=s.group(6) return d regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'p=re.compile(regex)rdd = sc.textFile("/home/ubuntu/walker/apache_logs")rdd2 = rdd.map(parse)rdd3 = rdd2.map(addID)es_write_conf = { "es.nodes" : "localhost", "es.port" : "9200", "es.resource" : 'walker/apache', "es.input.json": "yes", "es.mapping.id": "doc_id" } rdd3.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_write_conf)也可以这么封装,其实原理是一样的
import hashlibimport jsonfrom pyspark import Sparkcontextdef make_md5(line): md5_obj=hashlib.md5() md5_obj.encode(line) return md5_obj.hexdigest()def parse(line): dic={} l = line.split('\t') doc_id=make_md5(line) dic['name']=l[1] dic['age'] =l[2] dic['doc_id']=doc_id return dic #记得这边返回的是字典类型的,在写入es之前要记得dumpsdef saveData2es(pdd, es_host, port,index, index_type, key): """ 把saprk的运行结果写入es :param pdd: 一个rdd类型的数据 :param es_host: 要写es的ip :param index: 要写入数据的索引 :param index_type: 索引的类型 :param key: 指定文档的id,就是要以文档的那个字段作为_id :return: """ #实例es客户端记得单例模式 if es.exist.index(index): es.index.create(index, 'spo') es_write_conf = { "es.nodes": es_host, "es.port": port, "es.resource": index/index_type, "es.input.json": "yes", "es.mapping.id": key } (pdd.map(lambda _dic: ('', json.dumps(_dic)))) #这百年是为把这个数据构造成元组格式,如果传进来的_dic是字典则需要jdumps,如果传进来之前就已经dumps,这便就不需要dumps了 .saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_write_conf) )if __name__ == '__main__': #实例化sp对象 sc=Sparkcontext() #文件中的呢内容一行一行用sc的读取出来 json_text=sc.textFile('./1.txt') #进行转换 json_data=json_text.map(lambda line:parse(line)) saveData2es(json_data,'127.0.01','9200','index_test','index_type','doc_id') sc.stop()看到了把,面那个例子在写入es之前加了一个id,返回一个元组格式的,现在这个封装指定_id就会比较灵活了
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。
MySQL与Elasticsearch数据不对称问题解决办法jdbc-input-plugin只能实现数据库的追加,对于elasticsearch增量写入,但经
pyspark是Spark对Python的api接口,可以在Python环境中通过调用pyspark模块来操作spark,完成大数据框架下的数据分析与挖掘。其中
最近做了一项工作需要把处理的数据写入到Excel表格中进行保存,所以在此就简单介绍使用Python如何把数据保存到excel表格中。数据导入之前需要安装xlwt
读取、写入和Python编写程序的最后一个基本步骤就是从文件读取数据和把数据写入文件。阅读完这篇文章之后,可以在自己的to-do列表中加上检验这个技能学习效果的
本文实例讲述了python文件写入的用法。分享给大家供大家参考。具体分析如下:Python中wirte()方法把字符串写入文件,writelines()方法可以