Neo4j基础

随着社交、电商、金融、零售、物联网等行业的快速发展,现实社会织起了了一张庞大而复杂的关系网,传统数据库很难处理关系运算。大数据行业需要处理的数据之间的关系随数据量呈几何级数增长,亟需一种支持海量复杂数据关系运算的数据库——>图数据库

定义

图数据库(英语:graph database,GDB)是一个使用图结构进行语义查询数据库,它使用节点和属性来表示和存储数据。该系统的关键概念是,它直接将存储中的数据项,与数据节点和节点间表示关系的的集合相关联。这些关系允许直接将存储区中的数据链接在一起,并且在许多情况下,可以通过一个操作进行检索。图数据库将数据之间的关系作为优先级。查询图数据库中的关系很快,因为它们永久存储在数据库本身中。可以使用图数据库直观地显示关系,使其对于高度互连的数据非常有用。

py2neo 简单应用

图数据库连接

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# !/usr/bin/env python
# -*- coding:utf-8 -*-


import re
import py2neo
import logging
import traceback
logger = logging.getLogger('DK')






# 删除dict中的指定keys
def pop_dict(data_dict, pop_list):
    [data_dict.pop(K) for K in pop_list]
    return data_dict

# 删除dict中值为None的key
def none_dict(original:dict):
    return {k: v for k, v in original.items() if v is not None}


class Neo4jHelper(object):
    """定义MYSQL数据库操作类"""
    def __init__(self, neo4j_databases):
        """
        初始化参数
        :param host: 主机
        :param user: 用户名
        :param password: 密码
        :param database: 数据库
        :param port: 端口号,默认是7474
        """
        self.host = neo4j_databases['host']
        self.username = neo4j_databases['username']
        self.password = neo4j_databases['password']
        self.connect()


    # 将json修改为适合neo4j的格式
    def json2neo(self, json_str:str):
        '''
        "{'key':'values'}"-->"{key:'values'}"
        '''
        pattern = re.compile(r"'(\w+)'(\s*:\s*)")
        return re.sub(pattern, r'\1\2', json_str, count=0)

    def connect(self):
        """获取连接对象和执行对象"""
        try:
            self.graph = py2neo.Graph(self.host, auth=(self.username, self.password))
            logger.info("连接neo4j数据库成功")
        except Exception as e:
            logger.info("neo4j数据库连接错误,错误如下:{}".format(traceback.format_exc()))

    # 节点唯一性设置
    def graph_deduplicate(self, label, attid):
        cql = "create constraint on (n:{}) assert n.{} is unique".format(label[0], attid)
        try:
            self.graph.run(cql)
            logger.info("标签{}下已成功创建唯一索引{}".format(label, attid))
            result = True
        except:
            result = self.graph.schema.get_indexes(label[0])
            if result[0][0] == attid:
                logger.info("标签{}下已存在唯一索引{}不需要创建".format(label, result[0][0]))
            else:
                cypher = "DROP CONSTRAINT ON (n:{}) ASSERT n.{} IS UNIQUE;CREATE CONSTRAINT ON (n:{}) ASSERT n.{} IS UNIQUE".format(label, result[0][0], label, attid)
                self.graph.run(cypher)




    # 创建节点
    def create_nodes(self, label, attributes, attid, batchs=1000):
        """
        :param label: ['label']
        :param attributes: [{"id":"01"},{"id":"02"}]
        :param attid: 'id'
        :return:
        """
        # 创建唯一标识
        self.graph_deduplicate(label, attid)
        total_data_verify = 0
        nodes_batch = [attributes[i:i + batchs] for i in range(0, len(attributes), batchs)]
        for nodes in nodes_batch:
            nodes = [none_dict(dt) for dt in nodes]
            jsnodes = self.json2neo(str(nodes))
            try:
                cypher = "CALL apoc.create.nodes({}, {})".format(label, jsnodes)
                result = self.graph.run(cypher).data()
                total_data_verify = total_data_verify + len(result)
            except:
                logger.info("创建的节点中存在重复节点,改采用批量单节点创建方式请耐心等待")
                cypher = """CALL apoc.periodic.iterate( 
                            "UNWIND {} as row return row", 'CALL apoc.merge.node(["label"], {{id: row.id}}, row, row) yield node return count(*) as value',
                            {{params: {{}}, batchSize: 1000, parallel:true,iterateList:true}}) 
                             YIELD timeTaken,operations RETURN operations.committed as committed""".format(jsnodes)
                result = self.graph.run(cypher).data()[0]['committed']
                total_data_verify = total_data_verify + result

        logger.info("创建节点成功,本次累计创建或更新节点:{}个".format(str(total_data_verify)))
        return total_data_verify

    # 创建关系
    def create_relationships(self, label,start_label, end_label, start_att, end_att, start, end, attributes, batchs=100000):
        '''
        :param label: 输入创建关系的标签
        :param start_att: 输入图中开始节点索引
        :param end_att: 输入图中结束节点索引
        :param start: 输入attributes数据中开始节点索引
        :param end: 输入attributes数据中结束节点索引
        :param attributes: [{"id":"01", "id":"02","properties":{"att_one":1, "att_two":2}}]
        :param batchs: 设置批次大小,预置1000
        :return: 创建成功
        '''
        attribute_batchs = [attributes[i:i + batchs] for i in range(0, len(attributes), batchs)]
        total_data_verify = 0
        for attribute_batch in attribute_batchs:
            try:
                json_attribute = self.json2neo(str(attribute_batch))
                cypher = """CALL apoc.periodic.iterate( 
                            "UNWIND {} as row match (n:{}{{{}:row.{}}}),(m:{}{{{}:row.{}}}) return n,m,row.properties as properties",
                            "CALL apoc.create.relationship(n, '{}' ,properties, m) YIELD rel RETURN rel",
                            {{params: {{}}, batchSize: 1000, parallel:true,iterateList:true}}) 
                            YIELD timeTaken,operations RETURN operations.committed as committed""".format(json_attribute, start_label, start_att, start, end_label, end_att, end, label)
                result = self.graph.run(cypher).data()[0]['committed']
                total_data_verify = total_data_verify + result
            except Exception as e:
                logger.info("创建关系失败,错误原因:{}".format(traceback.format_exc()))
                logger.info("批关系创建失败,失败数据如下{}".format(str(attribute_batch)))

        logger.info("创建关系,本次累计创建关系:{}个".format(str(total_data_verify)))
        return total_data_verify






    # 更新关系
    def update_relationships(self, label, start_att, end_att, start, end, attributes, attid, batchs=1000):
        '''
        :param label: 输入创建关系的标签
        :param start_att: 输入图中开始节点索引
        :param end_att: 输入图中结束节点索引
        :param start: 输入attributes数据中开始节点索引
        :param end: 输入attributes数据中结束节点索引
        :param attributes: [{"id":"01", "id":"02","properties":{"att_one":1, "att_two":2}}]
        :param attid: 根据关系属性主键进行更新
        :param batchs: 设置批次大小,预置1000
        :return: 创建成功
        '''
        attribute_batchs = [attributes[i:i + batchs] for i in range(0, len(attributes), batchs)]
        for attribute_batch in attribute_batchs:
            try:
                json_attribute = self.json2neo(str(attribute_batch))
                cypher = """CALL apoc.periodic.iterate( 
                            "UNWIND {} as row match (n:{{{}:row.{}}}),(m:{{{}:row.{}}}) return n,m,row.properties as properties",
                            "CALL apoc.merge.relationship(n, {} , {{{}:properties.{}}}, properties , m, properties) yield rel RETURN count(*)",
                            {{params: {{}}, batchSize: 1000, parallel:true,iterateList:true}}) 
                            YIELD timeTaken,operations RETURN timeTaken, operations""".format(json_attribute, start_att, start, end_att, end, label, attid,attid)
                self.graph.run(cypher, x=label)
            except Exception as e:
                logger.info("创建关系失败,错误原因:{}".format(traceback.format_exc()))
                logger.info("批关系更新失败,失败数据如下{}".format(str(attribute_batch)))
        logger.info("更新关系,本次累计更新或创建关系:{}个".format(len(attributes)))




    def delete_batch_node_or_relation(self, type, attributes, id:list, batch_size=10000):
        """
        批量删除关系、节点和关系
        :param type: 删除节点,或删除关系
        :param id: 待删除id
        :param batch_size:批量大小
        :param attributes: 属性
        :return:
        """
        try:
            if type == "nodes":
                cypher = """CALL apoc.periodic.commit("MATCH (n) where n.{} in {} WITH n LIMIT $limit DETACH DELETE n RETURN count(*)",{{limit: {}}})
                            YIELD updates, executions, runtime, batches
                            RETURN updates""".format(attributes, id, batch_size)
                attribute_batch = self.graph.run(cypher).data()[0]['updates']
                logger.info("删除{}成功,共删除{}{}个".format(type,type,str(attribute_batch)))
            elif type == "relationships":
                cypher = """CALL apoc.periodic.commit("match (n)-[r]->(m) where r.{} in {} with r LIMIT $limit DELETE r RETURN count(*)",{{limit: {}}})
                            YIELD updates, executions, runtime, batches
                            RETURN updates""".format(attributes, id, batch_size)
                attribute_batch = self.graph.run(cypher).data()[0]['updates']
                logger.info("删除{}成功,共删除{}{}个".format(type,type,str(attribute_batch)))

            else:
                logger.info("输入不符合规定")
                attribute_batch = 0
        except Exception as e:
            logger.info("删除{}失败,错误原因:{}".format(type, traceback.format_exc()))
            attribute_batch = 0
        return attribute_batch





    # 一键删除,欲练神功,切勿自宫
    def delete_all(self):
        self.graph.delete_all()
        node_labels = list(self.graph.schema.node_labels)
        for node_label in node_labels:
            node_ids = self.graph.schema.get_indexes(node_label)
            for node_id in node_ids:
                cypher = "DROP CONSTRAINT  ON ({}:{}) ASSERT ({}.{}) IS UNIQUE".format(node_label, node_label, node_label, node_id[0])
                self.graph.run(cypher)
                logger.info("标签{}下属性为{}的节点已删除".format(node_label, node_id))



    # 关闭数据库
    def closeNeo4j(self):
        """关闭数据库连接"""
        del self.graph




# if __name__ == "__main__":
#     ne4jbase = {"host":'*******',
#         "username": '***',
#         "password":'*******'}
#     NH = Neo4jHelper(ne4jbase)
#     NH.delete_all()