Hadoop은 이미 알려질대로 잘 알려진 분산 컴퓨팅 프레임워크입니다. 많은 사람들이 Hadoop 하면 MapReduce 프로그래밍을 주로 떠올리지만 자체적으로 제공하는 Hadoop RPC와 분산 파일 시스템인 HDFS를 가지고도 재미있는 것을 시도해 볼 수 있을 것 같습니다. 본 포스팅에서는 그 중에서 Hadoop RPC를 이용한 간단한 서버 클라이언트 프로그램의 구현방법을 소개합니다.
Hadoop RPC Concept
Hadoop RPC는 일반적으로 하나의 프로토콜 인터페이스(interface)와 하나의 Server 그리고 하나 이상의 Client(들)로 동작합니다. Hadoop RPC 서버의 인스턴스와 클라이언트 프록시의 인스턴스는 org.apache.hadoop.ipc.RPC 라는 클래스를 통해 얻을 수 있는데 내부적으로는 java reflection을 통해 구현되어 있습니다. 그리고 RPC method의 파라메터와 리턴 값은 오직 자바 primitive type들(예: int, long, String 등등)과 Writable 인터페이스를 구현한 구상클래스만 될 수 있습니다. 또한 Hadoop RPC는 자체적으로 서버와 클라이언트에 대한 기본적인 기능을 제공합니다. 따라서 복잡하게 스레드나 소켓 통신을 직접 구현할 필요가 없으며 개발자는 오로지 RPC 프로토콜 인터페이스와 RPC 메소드들에 대한 내용만 채워 넣으면 됩니다.
Implementation of RPC Protocol
RPC Protocol은 인터페이스로 정의되어야 하며 이 인터페이스는 org.apache.hadoop.ipc.VersionedProtocol을 상속하여야 합니다. VersionedProtocol은 자체적으로 getProtocolVersion() 메소드를 가지고 있는데 이 메소드는 프로토콜의 버전이 다양할 경우 서버-클라이언트가 다른 버전의 프로토콜로 통신하는 것을 방지하는 역할을 합니다.
RPC 프로토콜은 다음 예제와 같이 간단히 만들 수 있습니다. 아래 예제는 String 값을 반환하는 heartBeat()라는 하나의 RPC 메소드를 가진 RPC 프로토콜 인터페이스입니다.
import org.apache.hadoop.ipc.VersionedProtocol; public interface RPCProtocol extends VersionedProtocol { public long versionID=0; public String heartBeat() throws IOException; }
Implementation of RPC Server
위에서 설명한 RPC 프로토콜의 서버 역할을 할 구상 클래스를 구현합니다. 서버 클래스는 간단히 위에서 정의한 RPCProtocol 인터페이스를 implements 하면 됩니다 (아래 예제 참조).
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; public class TestServer implements RPCProtocol { @Override public String heartBeat() throws IOException { return "Hello"; } @Override public long getProtocolVersion(String arg0, long arg1) throws IOException { return 0; } /** * @param args * @throws IOException * @throws InterruptedException */ public static void main(String[] args) throws IOException, InterruptedException { TestServer s = new TestServer(); Configuration conf = new Configuration(); Server server = RPC.getServer(s, "localhost", 10000, conf); server.start(); server.join(); } }
RPCProtocol 인터페이스에서 정의했던 String heartBeat() 메소드 역시 구현되어 있습니다. 반환 값으로 “Hello”가 호출한 RPC 클라이언트에게 전달 될 것입니다.
서버의 시동은 main 메소드에 구현되어 있습니다. 우선 프로토콜의 구상클래스(TestServer)의 인스턴스를 생성하고 RPC.getServer()에 인자로 전달합니다. 또한 getServer 메소드는 추가적으로 서버가 binding할 IP와 port 번호를 인자로 받으며 Server 클래스의 인스턴스를 반환합니다(내부적으로는 TestServer 클래스의 인스턴스에 대한 Listener 스레드를 생성하여 파라메터로 전달된 IP 및 port 번호와 바인딩 시킵니다. 그리고 RPC 콜이 있을 때마다 TestServer의 메소드를 콜하게 됩니다. 처리 결과는 Responder 스레드를 통해 반환하게 됩니다).
RPC.getServer 메소드의 원형은 다음과 같습니다.
static RPC.Server |
getServer(Object instance, String bindAddress, int port, Configuration conf) |
Implementation of RPC Client
클라이언트는 RPC.waitForProxy 메소드를 통해서 간단히 얻을 수 있습니다. 그리고 클라이언트는 반환값으로 받은 proxy 인스턴스를 이용해서 손쉽게 RPC method를 콜하고 서버로부터 응답을 받아 올 수 있습니다.
static VersionedProtocol |
getProxy(Class<?> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,Configuration conf, SocketFactory factory) |
import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; public class TestClient { /** * @param args * @throws IOException * @throws InterruptedException */ public static void main(String[] args) throws IOException, InterruptedException { Configuration conf = new Configuration(); InetSocketAddress addr = new InetSocketAddress("localhost", 10000); RPCProtocol rpc = (RPCProtocol) RPC.waitForProxy(RPCProtocol.class, RPCProtocol.versionID, addr, conf); String msg = null; while(true) { Thread.sleep(1000); msg = rpc.heartBeat(); System.out.println(msg); } } }
위 예제는 프록시 인스턴스 변수인 rpc를 통해 손쉽게 rpc.heartBeat() 메소드를 실행하고 서버로 부터 결과를 얻는 내용을 설명합니다.
Test
서버를 먼저 실행하고 클라이언트를 실행하면 됩니다. 사실 순서를 바꿔 실행해도 크게 문제 되지 않습니다. Hadoop RPC의 클라이언트는 먼저 실행되었을 경우 RPC 서버에 접속이 될 때까지 1초 단위로 반복하여 접속 시도를 하게 됩니다.
정상적으로 수행되는 경우 다음과 같은 메시지를 확인할 수 있습니다.
Hello Hello Hello Hello Hello ...
Recent Comments