In einem früheren Beitrag haben wir über die Suche nach Partitionen in einem unverbundenen Graphen mithilfe von Cascading gesprochen. In Wirklichkeit sind die meisten Graphen vollständig verbunden, so dass es nicht sehr hilfreich ist, nur bereits getrennte Graphen partitionieren zu können. In diesem Beitrag werfen wir einen Blick auf die Partitionierung eines zusammenhängenden Graphen anhand einiger Kriterien für die Erstellung einer Partitionsgrenze.
Nehmen wir den Beispielgraphen aus dem vorherigen Beitrag und machen wir ihn zu einem zusammenhängenden Graphen. Dazu fügen wir zwei zusätzliche Knoten hinzu, die mit vielen anderen Knoten im Graphen verbunden sind. Etwa so:
Wie Sie sehen können, ist der Graph durch die beiden stark verbundenen Knoten vollständig verbunden. Wenn Sie die stark verbundenen (roten) Knoten entfernen würden, wäre der Graph derselbe, unverbundene Graph wie im Beispiel aus dem vorherigen Beitrag.
Kriterium der Aufteilung
Um herauszufinden, wo die Partitionen im Graphen erstellt werden sollen, benötigen wir ein Kriterium, um zu entscheiden, ob eine bestimmte Kante bei der Partitionierung berücksichtigt werden soll oder nicht. In unserem Beispiel verwenden wir die Anzahl der eingehenden Verbindungen eines jeden Knotens (Integrad), um zu bestimmen, ob die Kanten zu diesem Knoten in eine Partition aufgenommen werden sollen. Wir zählen also die Anzahl der eingehenden Kanten jedes Knotens und markieren dann die Kanten, die auf Knoten zeigen, deren Anzahl der eingehenden Kanten einen bestimmten Schwellenwert übersteigt, als Kanten, denen nicht gefolgt werden soll. Das funktioniert, weil die beiden Knoten, die alles miteinander verbinden, eine hohe Anzahl von eingehenden Verbindungen haben. Dies gilt natürlich nicht für alle Graphen. In anderen Fällen benötigen Sie vielleicht andere Partitionierungskriterien. Wenn die Knoten in einem Diagramm beispielsweise geografische Orte darstellen, könnten Sie die Entfernung zwischen den Knoten verwenden, um zu bestimmen, ob die Kante zwischen ihnen verfolgt werden soll. Jede andere Funktion von zwei Knoten, die bestimmt, ob die Kante verfolgt werden soll, funktioniert ebenfalls (solange diese Funktion symmetrisch ist). Beachten Sie auch, dass wir Kanten markieren, die vom Verfolgen ausgeschlossen werden sollen, nicht Knoten.
Repräsentation
Sobald wir festgelegt haben, welche Kanten wir verfolgen und welche wir auslassen sollen, müssen wir diese Informationen irgendwie mit dem Graphen speichern, so dass der iterative Job diese Informationen nutzen kann. Eine Lösung wäre, den Indegree jedes Knotens als Teil der Daten zu speichern und für jede Kante auszuwerten, ob wir ihr folgen sollten oder nicht. Dies ist jedoch problematisch. Für die Anzahl der Kanten würde es funktionieren, da die dafür erforderliche Datenmenge überschaubar ist. Für komplexere Partitionierungskriterien, die viele Informationen über jeden Knoten benötigen, wäre die Menge der übertragenen Daten allein für die Partitionierung jedoch potenziell sehr groß. Sobald wir herausgefunden haben, welche Kanten interessant sind und welche nicht, speichern wir diese Tatsache einfach zusammen mit dem Graphen. Wir tun dies in Form eines binären Vektors neben jeder Adjazenzliste in der ursprünglichen Darstellung. Im Bitvektor bedeutet eine 1, dass eine Kante enthalten ist, und eine 0, dass eine Kante ausgeschlossen ist. Für den obigen Graphen mit den beiden stark verbundenen Knoten würde das folgendermaßen aussehen:
3 0 200,2,1,100,3 0,1,1,0,1 4 1 200,100,4 0,0,1 5 2 5,3,200,100 1,1,0,0 3 3 200,100 0,0 7 4 100,5,7,200 0,1,1,0 6 5 100,200,6 0,0,1 6 6 100,200 0,0 7 7 100,3,200 0,1,0 11 10 100,11 0,1 12 11 12,100 1,0 14 12 13,14,100,10 1,1,0,1 14 13 14,100 1,0 14 14 100 0 26 20 22,21,26,25,100,24,23 1,1,1,1,0,1,1 21 21 100 0 28 22 27,28,100 1,1,0 23 23 100 0 24 24 100 0 25 25 100 0 26 26 100 0 27 27 100 0 29 28 23,100,29 1,0,1 29 29 100 0 31 30 31,200 1,0 32 31 32,200 1,0 33 32 33,200 1,0 34 33 200,34 0,1 35 34 200,35 0,1 36 35 200,36 0,1 36 36 200 0 43 40 41,200,42,43 1,0,1,1 44 41 44,200 1,0 43 42 200,43,41 0,1,1 43 43 200 0 44 44 200 0
Kantenzählung
Um die Datei wie oben beschrieben zu erstellen, müssen wir den Indegree für jeden Knoten im Graphen berechnen und anschließend Adjazenzlisten zusammen mit den Bit-Vektoren erstellen. Jeder Vektor sollte eine 0 für Positionen enthalten, an denen die Adjazenzliste einen Zielknoten mit einem hohen Indegree (höher als ein festgelegter Schwellenwert) aufweist. Dazu nehmen wir den ursprünglichen Graphen in der Kantenlisten-Darstellung (ein Beispiel finden Sie im vorherigen Beitrag) und befolgen diese Schritte:
- Nach Zielknoten gruppieren
- Zählen Sie die Anzahl der Vorkommen jedes Zielknotens (diese Anzahl ist der Indegree für jeden Knoten)
- Verbinden Sie dieses Ergebnis wieder mit der ursprünglichen Menge, so dass die Liste nun lautet:
sourcetargetcount - Nach Quellknoten gruppieren
- Erstellen Sie für jede Gruppe eine Adjazenzliste und einen Bitvektor
Cascading verfügt über die praktische Funktion, eine .dot-Datei zu schreiben, die einen von Ihnen erstellten Ablauf darstellt. Sie können diese .dot-Dateien mit einem Tool wie GraphViz öffnen, um sie in eine schöne visuelle Darstellung Ihres Ablaufs zu verwandeln. Was Sie unten sehen, ist der Ablauf für den Job, der die Zählungen und anschließend das Diagramm erstellt. Den Code für diesen Auftrag finden Sie hier.
Überspringen von Kanten
Als nächstes müssen wir im iterativen Teil unseres Ansatzes zur Partitionierung des Graphen sicherstellen, dass eine Kante, die eine entsprechende 0 im Bitvektor hat, nicht zwei Partitionen miteinander verbindet. In Wirklichkeit ist dies sehr einfach zu erreichen. Da wir mit Hilfe der Sortierung (unter Verwendung einer sekundären Sortierung) sicherstellen, dass wir immer die größtmögliche Partition wählen, müssen wir nur darauf achten, dass wir die Kanten, denen wir nicht folgen sollen, später sortieren als die, denen wir folgen sollen. Das bedeutet, dass wir jedes Mal, wenn wir Kanten gruppieren, nur ein Feld zur Sortierungsspezifikation hinzufügen müssen. Hier ist der vollständige Code dafür:
öffentlich Klasse IterateWithFlagsJob { öffentlich int laufen(Zeichenfolge Eingabe, Zeichenfolge Ausgabe, int maxIterations) { boolean fertig = false; int iterationCount = 0; während (!fertig) { Schema sourceScheme = neu TextDelimited(neu Felder("Partition", "Quelle", "Liste", "Flaggen"), "t"); Tippen Sie auf Quelle = neu Hfs(sourceScheme, currentIterationInputPath); Schema sinkScheme = neu TextDelimited(neu Felder("Partition", "Quelle", "Liste", "Flaggen"), "t"); Tippen Sie auf Waschbecken = neu Hfs(sinkScheme, currentIterationOutputPath, SinkMode.ERSETZEN); //SNIPPED SOME BOILERPLATE... Pfeife Iteration = neu Pfeife("Iteration"); //Für jeden Eingabedatensatz erstellen Sie einen Datensatz pro Knoten (Schritt 3) Iteration = neu Jede(Iteration, neu FanOut()); //GROUP BY node ORDER BY flag, partition DESCENDING Iteration = neu GroupBy( Iteration, neu Felder("Knoten"), neu Felder("Flagge", "Partition"), //Es funktioniert wegen der Flagge! true); //Für jede Gruppe, erstellen Sie Datensätze mit der größten Partition Iteration = neu Jede( Iteration, neu MaxPartitionToTuples(), Felder.ERGEBNISSE); //GROUP BY Quellknoten ORDER BY Partition DESCENDING (Schritt 4) Iteration = neu GroupBy( Iteration, neu Felder("Quelle"), neu Felder("Flagge", "Partition"), //wiederum die Flagge! true); //Für jede Gruppe erstellen Sie die Adjazenzliste neu //mit der größten Partition //Dieser Schritt aktualisiert auch den Zähler Iteration = neu Jede( Iteration, neu MaxPartitionToAdjacencyList(), Felder.ERGEBNISSE); //SNIPPED SOME BOILERPLATE... Durchfluss.vollständig(); //Zählerstand erfassen? lang updatedPartitions = Durchfluss.getFlowStats().getCounterValue( MaxPartitionToAdjacencyList.ZÄHLER_GRUPPE, MaxPartitionToAdjacencyList.PARTITIONEN_AKTUALISIERT_ZÄHLER_NAME); // Sind wir fertig? fertig = updatedPartitions == 0 || iterationCount == maxIterations - 1; iterationCount++; } return 0; } privat Klasse MaxPartitionToAdjacencyListContext { int Quelle; int Partition = -1; ListeInteger> Ziele; ListeByte> Flaggen; public MaxPartitionToAdjacencyListContext() { this.Targets = new ArrayListInteger>(); dies.Flaggen = new ArrayListByte>(); } } @WarnungenUnterdrücken("serial") private class MaxPartitionToAdjacencyList extends BaseOperationMaxPartitionToAdjacencyListContext> implementiert AggregatorMaxPartitionToAdjacencyListContext> { public final String PARTITIONEN_AKTUALISIERT_ZAEHLER_NAME = "Partitions Updates"; public final String COUNTER_GROUP = "graphs"; public MaxPartitionToAdjacencyList() { super(new Felder("partition", "Quelle", "Liste", "Flaggen")); } @Override public void start( FlowProcess flowProcess, AggregatorCallMaxPartitionToAdjacencyListContext> AggregatorAufruf) { MaxPartitionToAdjacencyListContext Kontext = neu MaxPartitionToAdjacencyListContext(); Kontext.Quelle = aggregatorAufruf.getGroup().getInteger("Quelle"); aggregatorAufruf.setKontext(Kontext); } @Override public void aggregate( FlowProcess flowProcess, AggregatorCallMaxPartitionToAdjacencyListContext> AggregatorAufruf) { MaxPartitionToAdjacencyListContext Kontext = AggregatorAufruf.getContext(); TupleEntry Argumente = AggregatorAufruf.getArguments(); int Knoten = Argumente.getInteger("Knoten"); int Partition = Argumente.getInteger("Partition"); boolean Flagge = Argumente.getBoolean("Flagge"); wenn (Kontext.Partition == -1) { Kontext.Partition = Partition; } sonst { wenn (Flagge && Kontext.Partition > Partition) { flowProcess.inkrementieren(COUNTER_GROUP, PARTITIONEN_AKTUALISIERT_ZAEHLER_NAME, 1); } } wenn (Knoten != Kontext.Quelle) { Kontext.Ziele.hinzufügen(Knoten); //hier konvertieren Sie boolesche Flags zurück in 1's oder 0's Kontext.Flaggen.hinzufügen((Byte) (Flagge ? 1 : 0)); } } @Override public void complete( FlowProcess flowProcess, AggregatorCallMaxPartitionToAdjacencyListContext> AggregatorAufruf) { MaxPartitionToAdjacencyListContext Kontext = aggregatorAufruf.getContext(); Tupel Ergebnis = neu Tupel( Kontext.Partition, Kontext.Quelle, StringUtils.joinObjects(",", Kontext.Ziele), StringUtils.joinObjects(",", Kontext.Flaggen)); //Hier sind die Flaggen aggregatorAufruf.getOutputCollector().hinzufügen(Ergebnis); } } @UnterdrückenWarnungen({ "seriell", "Rohtypen", "unkontrolliert" }) privat Klasse MaxPartitionToTuples erweitert BaseOperation implementiert Puffer { öffentlich MaxPartitionToTuples() { super(neu Felder("Partition", "Knoten", "Quelle", "Flagge")); } @Override public void operate( FlowProcess flowProcess, PufferAufruf bufferCall) { IteratorTupleEntry> itr = bufferCall.getArgumentsIterator(); int maxPartition; TupleEntry Eintrag = itr.nächste(); maxPartition = Eintrag.getInteger("partition"); emitTuple(bufferCall, maxPartition, Eintrag); während (itr.hasNext()) { Eintrag = itr.nächste(); emitTuple(bufferCall, maxPartition, Eintrag); } } privat void emitTuple( BufferCall bufferCall, int maxPartition, TupleEntry Eintrag) { Tupel Ergebnis = neu Tupel( maxPartition, Eintrag.getInteger("Knoten"), Eintrag.getInteger("Quelle"), Eintrag.getBoolean("Flagge")); bufferCall.getOutputCollector().hinzufügen(Ergebnis); } } @UnterdrückenWarnungen({ "seriell", "Rohtypen" }) privat Klasse FanOut erweitert BaseOperation implementiert Funktion { öffentlich FanOut() { super(neu Felder("Partition", "Knoten", "Quelle", "Flagge")); } @Override public void operate( FlowProcess flowProcess, FunctionCall functionCall) { TupleEntry args = functionCall.getArguments(); int Partition = args.getInteger("Partition"); int Quelle = args.getInteger("Quelle"); Tupel Ergebnis = neu Tupel(Partition, Quelle, Quelle, wahr); functionCall.getOutputCollector().hinzufügen(Ergebnis); Zeichenfolge[] nodeList = args.getString("Liste").geteilt(","); Zeichenfolge[] flagList = args.getString("Flaggen").geteilt(","); für (int c = 0; c nodeList.Länge; c++) { //Während des Fanout konvertieren wir die Flaggen in Boolesche Werte Ergebnis = neu Tupel( Partition, Integer.parseInt(nodeList[c]), Quelle, Integer.parseInt(flagList[c]) != 0); functionCall.getOutputCollector().hinzufügen(Ergebnis); } } } }
Die wichtigen Dinge hier sind:
- Die Tatsache, dass dieser Code fast genau derselbe ist wie der aus dem vorherigen Beitrag.
- Außer, dass er die Flaggen während der Arbeit mit sich herumträgt.
- Und dass es das Flag-Feld in der Sortierreihenfolge der Kanten verwendet, um sicherzustellen, dass es niemals die Partition einer Kante auswählt, die eine 0 als Flag gesetzt hat.
Das Ergebnis
Wenn wir den endgültigen Algorithmus gegen den Graphen laufen lassen, ergibt sich das folgende Bild:

Unsere Ideen
Weitere Blogs
Contact




