搜索
您的当前位置:首页正文

Giraph源码分析(九)Aggregators原理解析

2020-11-09 来源:哗拓教育

本人原创,转载请注明出处!欢迎大家加入 Giraph 技术交流群 : 228591158 Giraph中Aggregator的用法请参考官方文档:http://giraph.apache.org/aggregators.html ,本文重点在解析Giraph如何实现 Aggregators 。 基本原理 :在每个超级步中,每个Worker计算

本人原创,转载请注明出处!欢迎大家加入Giraph 技术交流群: 228591158

Giraph中Aggregator的用法请参考官方文档:http://giraph.apache.org/aggregators.html ,本文重点在解析Giraph如何实现Aggregators。

基本原理:在每个超级步中,每个Worker计算本地的聚集值。超级步计算完成后,把本地的聚集值发送给Master汇总。在MasterCompute()执行后,把全局的聚集值回发给所有的Workers。

缺点:当某个应用(或算法)使用了多个聚集器(Aggregators),Master要完成所有聚集器的计算。因为Master要接受、处理、发送大量的数据,无论是在计算方面还是网络通信层次,都会导致Master成为系统瓶颈。

改进:采用分片聚集 (sharded aggregators) . 在每个超级步的最后,每个聚集器被派发给一个Worker,该Worker接受和聚集其他Workers发送给该聚集器的值。然后Workers把自己的所有的聚集器发送给Master,这样Master就无需执行任何聚集,只是接收每个聚集器的最终值。在MasterCompute.compute执行后,Master不是直接把所有的聚集器发送给所有的Workers,而是发送给聚集器所属的Worker,然后每个Worker再把其上的聚集器发送给所有的Workers.

首先给出Master <-- > Worker间, Worker <--> Worker间通信协议,在每个类中的doRequest(ServerData serverData)方法中会解析并存储收到的消息。
1). org.apache.giraph.comm.requests.SendWorkerAggregatorsRequest 类 . Worker --> Worker Owner
功能:每个worker把当前超步的局部 aggregated values 发送到该Aggregator的拥有者。
2). org.apache.giraph.comm.requests.SendAggregatorsToMasterRequest 类. Worker Owner--> Master
功能:每个Worker把自己所拥有的Aggregator的最终 aggregated values 发送给 master。
3). org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest 类. Master --> Worker Owner.
功能:master把最终的 aggregated values 或aggregators 发送给该Aggregator的拥有者。
4). org.apache.giraph.comm.requests.SendAggregatorsToWorkerRequest 类。 Worker Owner--> Worker
功能: 发送最终的 aggregated values 到 其他workers。发送者为该Aggregator的拥有者,接受者为除发送者之外的所有workers。

\

IDC49rOsvLayvU1hc3RlckNvbXB1dGUuY29tcHV0ZSgpt723qNbQu/G1w7XEvtu8r8b3JiMyMDU0MDu++c6qxuSz9cq8JiMyMDU0MDs8L3N0cm9uZz6ho7TTtdoxuPazrLy2sr2/qsq8o6xNYXN0ZXJDb21wdXRlLmNvbXB1dGUoKbe9t6iyxbvxtcPBy8v509BWZXJ0ZXguY29tcHV0ZSgp1Nq12jC49rOsvLayvb7bvK+1xCYjMjA1NDA7oaM8L3A+CjxwPjEuILTTtdowuPazrLy2sr2/qsq8o6xCc3BTZXJ2aWNlTWFzdGVytffTw01hc3RlckFnZ3JlZ2F0b3JIYW5kbGVywOC1xGZpbmlzaFN1cGVyU3RlcChNYXN0ZXJDbGllbnQgbWFzdGVyQ2xpZW50KSC3vbeosNG+27yvxvfFybeiuPhXb3JrZXKjrL7bvK/G97XEdmFsdWXOqsnP0ru49rOsvLayvbXEyKu+1r7bvK8mIzIwNTQwO6OoZmluYWwgYWdncmVnYXRlZCB2YWx1ZXOjqaOstdrSu7TOzqqz9cq8JiMyMDU0MDuho8/IuPiz9k1hc3RlckFnZ3JlZ2F0b3JIYW5kbGVytcTA4LzMs9C52M+1o6zI58/Co7o8L3A+CjxwPjxpbWcgc3JjPQ=="http://www.2cto.com/uploadfile/Collfiles/20140523/201405230851307.jpg" alt="\">

finishSuperStep(MasterClient masterClient) 方法核心内容如下:

<喎?http://www.2cto.com/kf/ware/vc/" target="_blank" class="keylink">vcD4KPHByZSBjbGFzcz0="brush:sql;"> /** * Finalize aggregators for current superstep and share them with workers */ public void finishSuperstep(MasterClient masterClient) { for (AggregatorWrapper aggregator : aggregatorMap.values()) { if (aggregator.isChanged()) { // if master compute changed the value, use the one he chose aggregator.setPreviousAggregatedValue( aggregator.getCurrentAggregatedValue()); // reset aggregator for the next superstep aggregator.resetCurrentAggregator(); } } /** * 把聚集器发送给所属的Worker。发送内容: * 1). Name of the aggregator * 2). Class of the aggregator * 3). Value of the aggretator */ try { for (Map.Entry> entry : aggregatorMap.entrySet()) { masterClient.sendAggregator(entry.getKey(), entry.getValue().getAggregatorClass(), entry.getValue().getPreviousAggregatedValue()); } masterClient.finishSendingAggregatedValues(); } catch (IOException e) { throw new IllegalStateException("finishSuperstep: " + "IOException occurred while sending aggregators", e); } }
问题1:如何确定aggregator的Worker Owner ?

答:根据aggregator的Name来确定它所属的Worker,计算方法如下:

/**
 * 根据aggregatorName和所有的workers列表来计算aggregator所属的Worker
 * 参数aggregatorName:Name of the aggregator
 * 参数workers: Workers的list列表
 * 返回值:Worker which owns the aggregator
 */
public static WorkerInfo getOwner(String aggregatorName,List workers) {
 //用aggregatorName的HashCode()值模以 Workers的总数目
 int index = Math.abs(aggregatorName.hashCode() % workers.size());
 return workers.get(index); //返回aggregator所属的Worker
}
问题2:Worker 如何判断自身是否接收完自己所拥有的aggregators?

答:Master给某个Worker发送aggregators时,同时发送到该Worker的aggregators数目。使用的 SendAggregatorsToOwnerRequest类对消息进行封装和解析。

2. Worker接受Master发送的Aggregator,Worker把接收到的聚集体值发送给其他所有Workers,然后每个Workers就会得到上一个超级步的全局聚集值。

由前文知道,每个Worker都有一个ServerData对象,ServerData类中关于Aggregator的两个成员变量如下:

// 保存Worker在当前超步拥有的aggregators
private final OwnerAggregatorServerData ownerAggregator;
// 保存前一个超步的aggregators
private final AllAggregatorServerData allAggregatorData;

可以看到,ownerAggregatorData用来存储在当前超步Master发送给Worker的聚集器,allAggregatorData用来保存上一个超级步全局的聚集值。ownerAggregatorData和allAggregatorData值的初始化在SendAggregatorsToOwnerRequest 类中的doRequest(ServerData serverData)方法中,如下:
public void doRequest(ServerData serverData) {
 DataInput input = getDataInput();
 AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
 try {
 //收到的Aggregators数目。在CountingOutputStream类中有计数器counter,
 //每向
输出流中添加一个聚集器对象,计数加1. 发送时,在flush方法中把该值插入到输出流最前面。 int numAggregators = input.readInt(); for (int i = 0; i < numAggregators; i++) { String aggregatorName = input.readUTF(); String aggregatorClassName = input.readUTF(); if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) { LongWritable count = new LongWritable(0); //Master发送给该Worker的requests总数目. count.readFields(input); aggregatorData.receivedRequestCountFromMaster(count.get(), getSenderTaskId()); } else { Class> aggregatorClass = AggregatorUtils.getAggregatorClass(aggregatorClassName); aggregatorData.registerAggregatorClass(aggregatorName, aggregatorClass); Writable aggregatorValue = aggregatorData.createAggregatorInitialValue(aggregatorName); aggregatorValue.readFields(input); //把收到的上一次全局聚集的值赋值给allAggregatorData aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue); //ownerAggregatorData只接受聚集器 serverData.getOwnerAggregatorData().registerAggregator( aggregatorName, aggregatorClass); } } } catch (IOException e) { throw new IllegalStateException("doRequest: " + "IOException occurred while processing request", e); } //接受一个 request,计数减1,同时把收到的Data添加到allAggregatorServerData的List masterData中 aggregatorData.receivedRequestFromMaster(getData()); }

每个Worker在开始计算前,会调用BspServiceWorker类的prepareSuperStep()方法来进行聚集器值的派发和接受其他Workers发送的聚集器值。调用关系如下:

\

BspServiceWorker类的prepareSuperStep()方法如下:

@Override
public void prepareSuperstep() {
 if (getSuperstep() != INPUT_SUPERSTEP) {
	 /*
	 * aggregatorHandler为WorkerAggregatorHandler类型.可参考上文中MasterAggregatorHandler的类继承关系
	 * workerAggregatorRequestProcessor声明为WorkerAggregatorRequestProcessor(接口)类型,
	 * 实际为NettyWorkerAggregatorRequestProcessor的实例,用于Worker间发送聚集器的值。
	 */
 aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
 }
}

WorkerAggregatorHandler类的prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法如下:
public void prepareSuperstep(WorkerAggregatorRequestProcessor requestProcessor) {
 AllAggregatorServerData allAggregatorData =
 serviceWorker.getServerData().getAllAggregatorData();
 /**
 * 等待直到Master发送给该Worker的聚集器都已接受完,
 * 返回值为Master发送给该Worker的所有Data(聚集器)
 */
 Iterable dataToDistribute =
 allAggregatorData.getDataFromMasterWhenReady(
 serviceWorker.getMasterInfo());
 
 // 把从Master收到的Data(聚集器)发送给其他所有Workers
 requestProcessor.distributeAggregators(dataToDistribute);

 // 等待直到接受完其他Workers发送给该Workers的聚集器
 allAggregatorData.fillNextSuperstepMapsWhenReady(
 getOtherWorkerIdsSet(), previousAggregatedValueMap,
 currentAggregatorMap);
 // 只是清空allAggregatorServerData的List masterData对象
 // 为下一个超级步接受Master发送的聚集器做准备
 allAggregatorData.reset();
}
下面详述Worker如何判定已接收完所有Master发送的所有Request ? 主要目的在于描述分布式环境下线程间如何协作。在AllAggregatorServerData类中定义了TaskIdsPermitBarrier类型的变量masterBarrier,用来判断是否接收完Master发送的Request. TaskIdsPermitBarrier类中主要使用wait()、notifyAll()等方法来控制,当获得的aggregatorName等于AggregatorUtils.SPECIAL_COUNT_AGGREGATOR时,会调用requirePermits(long permits,int taskId)来增加接收的arrivedTaskIds和需要等待的request数目waitingOnPermits. 接受一个Request
 /**
 * Require more permits. This will increase the number of times permits
 * were required. Doesn't wait for permits to become available.
 *
 * @param permits Number of permits to require
 * @param taskId Task id which required permits
 */
 public synchronized void requirePermits(long permits, int taskId) {
 arrivedTaskIds.add(taskId);
 waitingOnPermits += permits;
 notifyAll();
 }
\ 接受一个Request后,会调用releaseOnePermit()方法把waitingOnPermits减1。 \

3. 在Vertex.compute()方法中,每个Worker聚集自身的值。计算完成后,调用WorkerAggregatorHandler类的finishSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法,把本地的聚集器的值给句聚集器的aggregatorName发送给该aggregator所属的Worker. Aggregator的属主Worker接受其他所有Workers发送的本地聚集值进行汇总,汇总完毕后发送给Master,供下一次超级步的MasterCompute.compute()方法使用。finishSuperstep方法如下:
 /**
 * Send aggregators to their owners and in the end to the master
 *
 * @param requestProcessor Request processor for aggregators
 */
 public void finishSuperstep(
 WorkerAggregatorRequestProcessor requestProcessor) {
 OwnerAggregatorServerData ownerAggregatorData =
 serviceWorker.getServerData().getOwnerAggregatorData();
 // First send partial aggregated values to their owners and determine
 // which aggregators belong to this worker
 for (Map.Entry> entry :
 currentAggregatorMap.entrySet()) {
 boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),
 entry.getValue().getAggregatedValue());
 if (!sent) {
 // If it's my aggregator, add it directly
 ownerAggregatorData.aggregate(entry.getKey(),
 entry.getValue().getAggregatedValue());
 }
 }
 // Flush
 requestProcessor.flush();
 // Wait to receive partial aggregated values from all other workers
 Iterable> myAggregators =
 ownerAggregatorData.getMyAggregatorValuesWhenReady(
 getOtherWorkerIdsSet());

 // Send final aggregated values to master
 AggregatedValueOutputStream aggregatorOutput =
 new AggregatedValueOutputStream();
 for (Map.Entry entry : myAggregators) {
 int currentSize = aggregatorOutput.addAggregator(entry.getKey(),
 entry.getValue());
 if (currentSize > maxBytesPerAggregatorRequest) {
 requestProcessor.sendAggregatedValuesToMaster(
 aggregatorOutput.flush());
 } 
 }
 requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());
 // Wait for master to receive aggregated values before proceeding
 serviceWorker.getWorkerClient().waitAllRequests();
 ownerAggregatorData.reset();
 }

调用关系如下: \

4. 大同步后,Master调用MasterAggregatorHandler类的prepareSusperStep(masterClient)方法,收集聚集器的值。方法内容如下:

 public void prepareSuperstep(MasterClient masterClient) {

 // 收集上次超级步的聚集值,为master compute 做准备
 for (AggregatorWrapper aggregator : aggregatorMap.values()) {
	// 如果是 Persistent Aggregator,则累加
	if (aggregator.isPersistent()) {
 aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
 }
 aggregator.setPreviousAggregatedValue(
 aggregator.getCurrentAggregatedValue());
 aggregator.resetCurrentAggregator();
 progressable.progress();
 }
 }
然后调用MasterCompute.compute()方法(可能会修改聚集器的值),在该方法内若根据聚集器的值调用了MasterCompute类的haltCompute()方法来终止MaterCompute,则表明要结束整个Job。那么Master就会通知所有Workers要结束整个作业;在该方法内若没有调用MasterCompute类的haltCompute()方法,则回到步骤1继续进行迭代。

备注:Job迭代结束条件有三,满足其一就行:
1) 达到最大迭代次数
2) 没有活跃顶点且没有消息在传递
3) 终止MasterCompute计算

总结:为解决在多个Aggregator条件下,Master成为系统瓶颈的问题。采取了把所有Aggregator派发给某一部分Workers,由这些Workers完成全局的聚集值的计算与发送,Master只需要与这些Workers进行简单数据通信即可,大大降低了Master的工作量。

追加:下面用图示方法说明上述执行过程。

实验条件:

1). 一个Master,四个Worker

2). 两个Aggregators,记为A1和A2。

1. Master把Aggregators发送给Workers,收到Aggregator的Worker就作为该Aggregator的Owner。下图中Master把A1发送给Worker1,A2发送给Worker3.那么Worker1就作为A1的Owner,Worker3就是A2的Owner。该步骤在MasterAggregatorHandler类的finishSuperStep(MasterClient masterClient) 方法中完成,使用的是SendAggregatorsToOwnerRequest 通信协议。注:每个Owner Worker 可能有多个聚集器。

\

图1 Master分发Aggregator

2. Workers接受Master发送的Aggregator,然后把Aggregator发送给其他Workers。Worker1要把A1分别发送给Worker2、Worker3和Worker4;Worker3要把A2分别发送给Worker1、Worker2和Worker4。该步骤在WorkerAggregatorHandler类的prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法中完成,使用的是SendAggregatorsToMasterRequest 通信协议。此步骤完成后,每个Worker上都有了聚集器A1和A2(具体为上一个超步的全局最终聚集值)。

\

3. 每个Worker调用Vertex.compute()方法开始计算,收集本地的Aggregator聚集值。对聚集体A1来说,Worker1、Worker2、Worker3、Worker4的本地聚集值依次记为:

A11 、A12、 A13、A14;对聚集器A2来说,Worker1、Worker2、Worker3、Worker4的本地聚集值依次记为:

A21 、A22、 A23、A24 。计算完成后,每个Worker就要把本地的聚集值发送给聚集器的Owner,聚集器的Owner在接受的时候会合并聚集。那么A11 、A12、 A13、A14要发送给Worker1进行全局聚集得到A1,A21 、A22、 A23、A24 要发送给Worker3进行全局聚集得到A2’ 。

公式如下:

\

此部分采用的是SendWorkerAggregatorsRequest通信协议。Worker1和Worker3要把汇总的A1和A2的新值:A1’ 和A2’发送给Master,供下一次超级步的MasterCompute.compute()方法使用采用的是SendAggregatorsToMasterRequest通信协议。此部分在WorkerAggregatorHandler类的finishSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法中完成。过程如下图所示:

\

4. Master收到Worker1发送的A1’ 和Woker3发送的A2’后,此步骤在MasterAggregatorHandler类的prepareSusperStep(masterClient)方法中完成。然后调用MasterCompute.compute()方法,此方法可能会修改聚集器的值,如得到A1’’和A2’’。在masterCompute.compute()方法内若根据聚集器的值调用了MasterCompute类的haltCompute()方法来终止MaterCompute,则表明要结束整个Job。那么Master就会通知所有Workers要结束整个作业;在该方法内若没有调用MasterCompute类的haltCompute()方法,则回到步骤1继续进行迭代,继续把A1’’发送给Worker1,A2’’发送给Worker3。



完!

本人原创,转载请注明出处!欢迎大家加入Giraph 技术交流群: 228591158

Top