Hadoop原始碼學習筆記——Socket到RPC呼叫?

Hadoop是一個分散式程式,分佈在多臺機器上執行,事必會涉及到網路程式設計。那這裡如何讓網路程式設計變得簡單、透明的呢?

超人學院吳超老師為你講解:

網路程式設計中,首先我們要學的就是Socket程式設計,這是網路程式設計中最底層的程式介面,分為伺服器端和客戶端,伺服器負責監聽某個埠,客戶端負責連線伺服器上的某個埠,一旦連線通過後,伺服器和客戶端就可以雙向通訊了。

方法/步驟

1. ServerSocket server = new ServerSocket(8111);

2. Socket socket = server.accept();

3.

4. //由Socket物件得到輸入流,並構造相應的BufferedReader物件

5. BufferedReader is = new BufferedReader(new InputStreamReader(socket.getInputStream()));

6. //由Socket物件得到輸出流,並構造PrintWriter物件

7. PrintWriter os = new PrintWriter(socket.getOutputStream());

8.

9. while(true){

10. String inline = is.readLine();

11. System.out.println(" 收到資訊:" + inline);

12. //伺服器反回

13. os.println("serverSend:" + inline);

14. os.flush();

15. if (inline == "bye")

16. break;

17. }

18. os.close();

19. is.close();

20. socket.close();

21. server.close();

22. System.out.println("伺服器退出");

1. Socket socket = new Socket("127.0.0.1",8111);

2.

3. //由Socket物件得到輸入流,並構造相應的BufferedReader物件

4. BufferedReader is = new BufferedReader(new InputStreamReader(socket.getInputStream()));

5. //由Socket物件得到輸出流,並構造PrintWriter物件

6. PrintWriter os = new PrintWriter(socket.getOutputStream());

7. BufferedReader sin=new BufferedReader(new InputStreamReader(System.in));

8. while(true){

9. System.out.println("請輸入:");

10. String line = sin.readLine();

11. os.println(line);

12. os.flush();

13. String inline = is.readLine();

14. System.out.println("伺服器獲取值:" + inline);

15. if (line=="bye")

16. break;

17. }

18. os.close();

19. is.close();

20. socket.close();

21. System.out.println("客戶端退出");

這兩段程式碼分別帖入兩個類中,分開執行,先執行伺服器端,再執行客戶端,就可以互發訊息了。

觀察下程式碼,發現程式碼中下面4~20行邏輯是一至的,都是通過流來通訊,所以Socket中不同的是開始地方,伺服器是通過server.accept()來獲取Socket,而客戶端是通過直接建立Socket物件的。

這段程式碼,其本執行是沒問題的,但存在一個問題,就是當客戶端接入時伺服器端的accept函式才走下去,不然的話,會一直處於卡死等待狀態。包括getInputStream函式,也會等待雙方接通後,才往下走。除非等到客戶端接入,或中斷。當然有人會說,可以引入多執行緒啊,沒錯,是可以,但是想一下,是不是每個客戶接入都得有一個執行緒? 否則少一個執行緒,就會有一堆的卡著。所以這種方式不適合在大最客戶端接入的情況。

在JDK1.4引入了非阻塞的通訊方式,這樣使得伺服器端只需要一個執行緒就能處理所有客戶端socket的請求。

下面是幾個需要用到的核心類:

· ServerSocketChannel: ServerSocket 的替代類, 支援阻塞通訊與非阻塞通訊.

· SocketChannel: Socket 的替代類, 支援阻塞通訊與非阻塞通訊.

· Selector: 為ServerSocketChannel 監控接收客戶端連線就緒事件, 為 SocketChannel 監控連線伺服器就緒, 讀就緒和寫就緒事件.

· SelectionKey: 代表 ServerSocketChannel 及 SocketChannel 向 Selector 註冊事件的控制代碼. 當一個 SelectionKey 物件位於Selector 物件的 selected-keys 集合中時, 就表示與這個 SelectionKey 物件相關的事件發生了.在SelectionKey 類中有幾個靜態常量

· SelectionKey.OP_ACCEPT->客戶端連線就緒事件 等於監聽serversocket.accept()返回一個socket

· SelectionKey.OP_CONNECT->準備連線伺服器就緒跟上面類似,只不過是對於socket的相當於監聽了socket.connect()

· SelectionKey.OP_READ->讀就緒事件, 表示輸入流中已經有了可讀資料, 可以執行讀操作了

· SelectionKey.OP_WRITE->寫就緒事件

所以伺服器端程式碼就可以升一下級了,變成如下:

1. public class SocketChannelTest implements Runnable {

2.

3. @Override

4. public void run() {

5. while (true) {

6. try {

7. selector.select();

8. Set keys = selector.selectedKeys();

9. Iterator iter = keys.iterator();

10. SocketChannel sc;

11. while (iter.hasNext()) {

12. SelectionKey key = iter.next();

13. if (key.isAcceptable())

14. ; // 新的連線

15. else if (key.isReadable())

16. ;// 可讀

17. iter.remove(); // 處理完事件的要從keys中刪去

18. }

19. } catch (Exception e) {

20. e.printStackTrace();

21. }

22. }

23. }

24. static Selector selector;

25.

26. public static void main(String[] args) throws IOException,

27. InterruptedException {

28. selector = Selector.open(); // 靜態方法 例項化selector

29. ServerSocketChannel serverChannel = ServerSocketChannel.open();

30. serverChannel.configureBlocking(false); // 設定為非阻塞方式,如果為true 那麼就為傳統的阻塞方式

31. serverChannel.socket().bind(new InetSocketAddress(8001)); // 繫結IP 及 埠

32. serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 註冊

33. // OP_ACCEPT事件

34. Thread thd = new Thread(new SocketChannelTest());

35. thd.start();// 開啟執行緒 處理請求

36. thd.join();

37. }

38. }

好,這樣通訊程式碼簡化了。但繼續想,我們通訊的目的是什麼?客戶端發一個指令,伺服器執行一些內容,然後把結果返回給客戶端。這不就像呼叫一下函式麼,呼叫函式名、傳入引數、返回值。

這個就稱之為遠端方法呼叫(RPC Remote Procedure Call Protocol),毫無疑問,這個RPC實現肯定是基於上面的這個Socket的。至於具體如何實現呢,我們看下面的分解。

在看實現之前,我們先看一下,這個RPC是如何用的,如何做到呼叫透明的:

我們在src下新建一個RPCTest的包,定義一個功能介面IRPCTestEntity.java:

1. package RPCTest;

2. import org.apache.hadoop.ipc.VersionedProtocol;

3. public interface IRPCTestEntity extends VersionedProtocol {

4. int Calc(int x,int y);

5. }

該介面中有一個Calc的函式。

定義一個實現類RPCTestEntity.java:

1. package RPCTest;

2. import java.io.IOException;

3. public class RPCTestEntity implements IRPCTestEntity{

4. @Override

5. public long getProtocolVersion(String protocol, long clientVersion) throws IOException {

6. return 0;

7. }

8.

9. public int Calc(int x,int y){

10. int z =0 ;

11. z = x + y;

12. return z;

13. }

14.

15. }

這個類中實現了Calc函式,執行內容為將x,y相加,將和返回。

我們再定義一個伺服器類(RPCTestSvr.java),將該實現類註冊成RPC服務:

1. package RPCTest;

2. import java.io.IOException;

3.

4. public class RPCTestSvr {

5. public static void main(String[] args) throws IOException, InterruptedException {

6. RPCTestEntity obj = new RPCTestEntity();

7. Configuration conf = new Configuration();

8. Server server = RPC.getServer(obj, "", 9001, conf);

9. server.start();

10. server.join();

11. }

12. }

程式碼比較簡單,定義了一個RPCTestEntity的實體,然後RPC建立一個Server,傳入實體物件,然後這個服務就呼叫join卡住,用於不斷接收請求。 建立完後,就可把這個"伺服器"啟動起來了。

再建立一個客戶端(RPCTestClient.java):

1. package RPCTest;

2.

3. import java.io.IOException;

4. import java.net.InetSocketAddress;

5.

6. import org.apache.hadoop.conf.Configuration;

7. import org.apache.hadoop.ipc.RPC;

8. import org.apache.hadoop.ipc.VersionedProtocol;

9.

10. public class RPCTestClient {

11. public static void main(String[] args) throws IOException {

12. InetSocketAddress addr = new InetSocketAddress("127.0.0.1",9001);

13. Configuration conf = new Configuration();

14. VersionedProtocol obj = RPC.getProxy(IRPCTestEntity.class, 0, addr, conf);

15. IRPCTestEntity ent = (IRPCTestEntity)obj;

16. int x = ent.Calc(5, 6);

17. System.out.println(x);

18. }

19. }

這裡,我們通過RPC.getProxy函式獲了一個IRPCTestEntity的介面例項,然後就可以直接呼叫了。

執行後,發現這個值馬上返回了過來,同時在"伺服器"端也會收到一定的請求資訊。說明兩者之間通了。

仔細看,這個客戶端中,整個過程就沒有涉及到RPCTestEntity這個實現的實體,換句話說,客戶端產生的是一個虛擬的實現類,然後呼叫起來了。

OK,示例程式跑起來了,也帶給我們幾個問題,1、這個客戶端中的obj是什麼物件?2、為什麼我們呼叫obj物件中的函式(Calc)會跑到伺服器上執行,如何實現的?

底層的通訊,我們是知道的,肯定用socket,用它能夠傳遞各種資料。如何與函式關聯呢? 我們進入getProxy函式,

我們看到這個getProxy函式中,返回了VersionedProtocol介面的物件,從字面意思,這個Proxy意為代理, 所以我們得到的obj就是一個代理類。同時也看出,要作為RPC處理物件,這個介面必實現VersionedProtocol(簡單地看下里面,只有一個函式,返回版本號,是用於判斷雙方版本所用,只有版本匹配,才能呼叫)。

其建立可以看到,用到了:

Proxy.newProxyInstance(

protocol.getClassLoader(), new Class[] { protocol },

new Invoker(addr, ticket, conf, factory));

然後這個代理類,就自動實現了偉放的protocol這個介面型別。然後當我們呼叫代理類中的函式時,這個傳入的Invoker類,就會收到通知,通知裡包含了呼叫資訊,我們進入Invoker中看一下:

private static class Invoker implements InvocationHandler

這是一個寫在RPC類中的內部類,且是私有的,意思就是隻為這個RPC呼叫,其實現的規定介面InvocationHandler,那麼就要實現規定的函式Invoke咯:

1. public Object invoke(Object proxy, Method method, Object[] args)

2. throws Throwable {

3. final boolean logDebug = LOG.isDebugEnabled();

4. long startTime = 0;

5. if (logDebug) {

6. startTime = System.currentTimeMillis();

7. }

8.

9. ObjectWritable value = (ObjectWritable)

10. client.call(new Invocation(method, args), address,

11. method.getDeclaringClass(), ticket);

12. if (logDebug) {

13. long callTime = System.currentTimeMillis() - startTime;

14. LOG.debug("Call: " + method.getName() + "" + callTime);

15. }

16. return value.get();

17. }

這個invoke函式,就是當我們呼叫代理類中的函式(obj.Calc)時,會收到的請求,看下引數,傳入的有,Method(函式),args(引數),一應俱全,有了這些內容後,就可以呼叫底層的Socket,將這些資訊打包起來(放入的Invocation類)中,一併發向伺服器中。

同時,伺服器端中,就比較容易了,在收到請求後,就可以解析出要呼叫的函式和引數,然後通過反射來呼叫在伺服器一開始註冊上的物件中的函式,再將返回值通過Socket傳回客戶端,再由這個invoke函式將值返回。

OK,這個幾個點想通了,整個過程就容易理解了。總之:

伺服器端——註冊服務:RPC.getServer(obj, "", 9001, conf);

客戶端——取得代理類:obj = RPC.getProxy()

通過這樣的包裝後,網路訪問就非常透明瞭。

相關問題答案