Поделиться через


Руководство пользователя GraphFrames — Scala

В этой статье приведены примеры из руководства пользователя GraphFrames.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._

Создание graphFrames

Графовые кадры можно создавать на основе вершин и пограничных кадров данных.

  • Кадр данных вершин. Кадр данных вершины должен содержать специальный столбец с именем id , который указывает уникальные идентификаторы для каждой вершины в графе.
  • Пограничный кадр данных. Пограничный кадр данных должен содержать два специальных столбца: src (идентификатор исходной вершины ребра) и dst (идентификатор конечной вершины ребра).

Оба кадра данных могут иметь произвольные другие столбцы. Эти столбцы могут представлять атрибуты вершин и ребер.

Создание вершин и ребер

// Vertex DataFrame
val v = spark.createDataFrame(List(
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
)).toDF("src", "dst", "relationship")

Давайте создадим граф из этих вершин и этих ребер:

val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends

Базовые запросы графа и кадра данных

GraphFrames предоставляют простые запросы графа, такие как степень узла.

Кроме того, так как графовые кадры представляют графы как пары вершинных и пограничных кадров данных, можно легко создавать мощные запросы непосредственно к вершинным и пограничным кадрам данных. Эти кадры данных доступны в виде полей вершин и ребер в graphFrame.

display(g.vertices)
display(g.edges)

Входящая степень вершин:

display(g.inDegrees)

Исходящая степень вершин:

display(g.outDegrees)

Степень вершин:

display(g.degrees)

Запросы можно выполнять непосредственно в кадре данных вершин. Например, на графике можно найти возраст самого молодого человека:

val youngest = g.vertices.groupBy().min("age")
display(youngest)

Аналогичным образом можно выполнять запросы к кадру данных ребер. Например, давайте подсчитаем количество связей "следовать" на графе:

val numFollows = g.edges.filter("relationship = 'follow'").count()

Поиск мотива

Создавайте более сложные связи с ребрами и вершинами с помощью мотивов. Следующая ячейка находит пары вершин с краями в обоих направлениях между ними. Результатом является Кадр данных, в котором имена столбцов являются ключами мотивов.

Дополнительные сведения об API см. в руководстве пользователя GraphFrame .

// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

Так как результатом является кадр данных, можно создавать более сложные запросы поверх мотива. Давайте выясним все взаимные отношения, в которых один человек старше 30 лет:

val filtered = motifs.filter("b.age > 30")
display(filtered)

Запросы с отслеживанием состояния

Большинство запросов motif без отслеживания состояния и просты в выражении, как в примерах выше. В следующих примерах показаны более сложные запросы, которые несут состояние по пути в мотиве. Выражайте эти запросы, объединяя поиск мотива GraphFrame с фильтрами по результату, где фильтры используют операции последовательности для создания ряда столбцов DataFrame.

Например, предположим, что вы хотите определить цепочку из 4 вершин с определенным свойством, определенным последовательностью функций. То есть, среди цепочек из 4 вершин a->b->c->dидентифицируйте подмножество цепей, соответствующих этому сложному фильтру:

  • Инициализация состояния по пути.
  • Обновление состояния на основе вершины a.
  • Обновление состояния на основе вершины b.
  • Др. для c и d.
  • Если конечное состояние соответствует определенному условию, фильтр принимает цепочку.

Этот процесс демонстрируется в следующих фрагментах кода, где мы определяем цепочки из 4 вершин таким образом, что по крайней мере 2 из 3 ребер являются "дружественными" связями. В этом примере состоянием является текущее количество "дружественных" ребер; как правило, это может быть любой столбец DataFrame.

// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

// Query on sequence, with state (cnt)
//  (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
  when(relationship === "friend", cnt + 1).otherwise(cnt)
}
//  (b) Use sequence operation to apply method to sequence of elements in motif.
//      In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
  foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
//  (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)

Подграфы

GraphFrames предоставляет API для создания вложенных графов путем фильтрации по ребрам и вершинам. Эти фильтры могут объединяться. Например, в следующем подграфе содержатся только друзья и старше 30 лет.

// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
  .filterEdges("relationship = 'friend'")
  .filterVertices("age > 30")
  .dropIsolatedVertices()

Сложные триплетные фильтры

В следующем примере показано, как выбрать подграф на основе тройных фильтров, которые работают с ребром и его вершинами "src" и "dst". Расширить этот пример, чтобы выйти за рамки тройнях, используя более сложные мотивы, очень просто.

// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
  .filter("e.relationship = 'follow'")
  .filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
//  val e2 = paths.select("e.*")

// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)

Стандартные алгоритмы графа

В этом разделе описываются стандартные алгоритмы графа, встроенные в GraphFrames.

Поиск в ширину (BFS)

Выполните поиск пользователей в возрасте 32 лет < в разделе "Esther".

val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)

Поиск также может ограничивать фильтры ребер и максимальную длину пути.

val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
  .edgeFilter("relationship != 'friend'")
  .maxPathLength(3)
  .run()
display(filteredPaths)

Подключенные компоненты

Вычисление членства в подключенном компоненте каждой вершины и возврат графа с каждой вершиной, назначенной идентификатором компонента.

val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)

Компоненты со строгой связью

Вычислить строго связанный компонент (SCC) каждой вершины и вернуть граф с каждой вершиной, назначенной SCC, содержащей ее.

val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))

Распространение меток

Запустите статический алгоритм распространения меток для обнаружения сообществ в сетях.

Каждый узел в сети изначально назначается отдельному сообществу. При каждом супершаге узлы отправляют свою принадлежность к сообществу всем соседям и обновляют свое состояние до присоединения входящих сообщений к сообществу в режиме.

LPA — это стандартный алгоритм обнаружения сообщества для графов. Это недорогое вычисление, хотя (1) конвергенция не гарантируется, и (2) можно получить тривиальные решения (все узлы идентифицируются в одном сообществе).

val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))

Pagerank

Определите важные вершины в графе на основе соединений.

// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)

Кратчайшие пути

Вычисляет кратчайшие пути к заданному набору вершин ориентира, где ориентиры задаются идентификатором вершины.

val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)

Подсчет треугольников

Вычисляет количество треугольников, проходящих через каждую вершину.

import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends  // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()