python连接elasticsearch并将数据传入kafka
LiuSw Lv6

python连接elasticsearch并将数据传入kafka

脚本功能:

配置本地配置config.ini(不存在运行时会自动创建),ptyhon连接本地sqlite数据库文件获取数据,校验数据在elasticsearch索引中是否存在,不存在则传入kafka,在由kafka自动传入logstash 最后传入elasticsearch中。

代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# -*- coding:utf-8 -*-

import sqlite3,json,socket,os
import sys # 1
from datetime import datetime
from elasticsearch import Elasticsearch
from kafka import KafkaProducer
import configparser

#########################################################################################
# Variable needs to be modified
# 需要修改的变量
#########################################################################################
def getVariable():
# 集群地址
# host1="192.168.11.156"
# host2="192.168.11.157"
# host3="192.168.11.159"
# kafka1="192.168.11.156:9092"
# kafka2="192.168.11.157:9092"
# kafka3="192.168.11.159:9092"
# 索引名称
# indexName="syslogdebug" + "-" + str(datetime.now().year) + "." + str(datetime.now().month)
# indexName2="syslogerror" + "-" + str(datetime.now().year) + "." + str(datetime.now().month)

global host1 ,host2 ,host3 ,kafkaPort, kafka1, kafka2, kafka3, indexName, indexName2, dbPath
host1 = str(config.get('variable', 'host1'))
host2 = str(config.get('variable', 'host2'))
host3 = str(config.get('variable', 'host3'))
kafkaPort = config.get('variable', 'kafkaPort')
kafka1 = config.get('variable', 'kafka1') + ":" + kafkaPort
kafka2 = config.get('variable', 'kafka2') + ":" + kafkaPort
kafka3 = config.get('variable', 'kafka3') + ":" + kafkaPort

# 索引名称
indexName = str(config.get('variable', 'indexName')) + "-" + str(datetime.now().year) + "." + str(datetime.now().month)
indexName2 = str(config.get('variable', 'indexName2')) + "-" + str(datetime.now().year) + "." + str(datetime.now().month)

# db文件位置
dbPath=str(config.get('variable', 'dbPath'))

print("\n[variable]")
print("host1="+host1+"\n"+"host2="+host2+"\n"+"host3"+host3)
print("kafkaPort="+kafkaPort+"\n"+"kafka1="+kafka1+"\n"+"kafka2="+kafka2+"\n"+"kafka3="+kafka3)
print("indexName="+indexName+"\n"+"indexName="+indexName+"")
print("dbPath="+dbPath+"\n")

#########################################################################################
# Default variable
# 默认的变量
#########################################################################################
def getDefault():
# es 端口 默认为9200
# port="9200"
# es 默认用户
# user="elastic"
# es 密码
# passwd="Root@123"

global port, user, passwd
port = str(config.get('default', 'port')).strip('"')
user = str(config.get('default', 'user')).strip('"')
passwd = str(config.get('default', 'passwd')).strip('"')
print("[default]")
print("port="+port+"\n"+"user="+user+"\n"+"passwd="+passwd+"\n")

##########################################################################################
# function: select ip
# 函数: 查询本机ip地址
##########################################################################################
def get_host_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip


##########################################################################################
# function: select index is or not exists
# 函数: 查询索引是否存在
##########################################################################################
def indicesExists(my_index):
res = es.indices.exists(my_index)
return res

##########################################################################################
# function: create index
# 函数: 创建索引
##########################################################################################
def createIndex(my_index):
res = es.indices.create(index=my_index, ignore=400)
return res

def main():
#########################################################################################
# The system configuration
# 系统配置
#########################################################################################
reload(sys) # 2
sys.setdefaultencoding('utf-8') # 3

#########################################################################################
# Get IP
# 获取IP地址
#########################################################################################
global IP
IP=get_host_ip()


if __name__ == '__main__':
main()


#########################################################################################
# Reading configuration Files
# 读取配置文件
#########################################################################################
global config
# 生成ConfigParser对象
config = configparser.ConfigParser()
# 读取配置文件
filename = 'config.ini'
config.read(filename, encoding='utf-8')
# 获取Variable变量
getVariable()
# 获取Default变量
getDefault()

#########################################################################################
# create Elasticsearch connection
# 创建es连接
#########################################################################################
es = Elasticsearch(
[
{"host": host1, "port": port},
{"host": host2, "port": port},
{"host": host3, "port": port}
],
http_auth=(user, passwd),
timeout=3600
)

#########################################################################################
# create kafka connection
# 创建kafka连接
#########################################################################################
producer = KafkaProducer(
bootstrap_servers=[kafka1,kafka2,kafka3],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

#########################################################################################
# check index exists if not exists to create
# 检查索引,不存在就创建
#########################################################################################
indexEx=indicesExists(indexName)
if indexEx != True:
createIndex(indexName)

#########################################################################################
# connect sqlite Datebase And select data
# 连接sqlite数据库,查询数据
#########################################################################################
conn = sqlite3.connect(dbPath)
c = conn.cursor()
cursor = c.execute("SELECT * from SysLogDebug")

#########################################################################################
# insert SysLogDebug data
# 插入SysLogDebug表数据
#########################################################################################
for row in cursor:
data = dict([("ip",IP),("logid",str(row[0])),("date",str(row[1])),("assemblyname",str(row[2])),("namespacename",str(row[3])),("classname",str(row[4])),("methodname",str(row[5])),("elapsedtime",str(row[6])),("message",str(row[7]))])
logidEx=es.search(index=indexName, doc_type='_doc', body={"query": {"match":{"logid": row[0]}}})
if str(logidEx).find("logid") == -1:
producer.send("syslogdebug", data)
# 测试
#if row[0] == 40:
# break

#########################################################################################
# check index exists if not exists to create
# 检查索引,不存在就创建
#########################################################################################
indexEx=indicesExists(indexName2)
if indexEx != True:
createIndex(indexName2)

#########################################################################################
# select data
# 查询数据
#########################################################################################
cursor2 = c.execute("SELECT * from SysLogError")

#########################################################################################
# insert SysLogError data
# 插入SysLogError表数据
#########################################################################################
for row in cursor2:
data2 = dict([("ip",IP),("logid",str(row[0])),("date",str(row[1])),("message",str(row[2]))])
logidEx2=es.search(index=indexName2, doc_type='_doc', body={"query": {"match":{"logid": row[0]}}})
if str(logidEx2).find("logid") == -1:
producer.send("syslogerror", data2)
#测试
#if row[0] == 15:
# break

#########################################################################################
# Close the connection
# 关闭连接
#########################################################################################
#producer.flush()
producer.close()
conn.close()

#python = sys.executable # 获取当前执行python
#os.execl(python, python, *sys.argv) # 执行命令
exit

 评论