葡萄娱乐场HDFS读写进程,通过Thrift访问HDFS分布式文件系统的性子瓶颈分析

HDFS写过程:

image.png

壹、 使用HDFS提供的客户端Client, 向远程的Namenode发起逍客PC请求;

二、 Namenode会视境况重返文件的有个别依旧全部block列表, 对于各样block,
Namenode都会回去有该block拷贝的DataNode地址;

叁、 客户端Client会选择离客户端近年来的DataNode来读取block;
假设客户端自身就是DataNode, 那么将从地点平素获取数据;

4、 读取完当前block的数据后, 关闭当前的DataNode链接,
并为读取下二个block寻找最好的DataNode;

5、 当读完列表block后, 且文件读取还未曾达成,
客户端会一连向Namenode获取下一堆的block列表;

陆、 读取完一个block都会进展checksum验证, 若是读取datanode时出现错误,
客户端会通告Namenode,
下一场再从下一个全部该block拷贝的datanode继续读。

4、Thrift的造访流程:猜想版

HDFS写入流程:

image.png

壹、 使用HDFS提供的客户端Client, 向远程的Namenode发起TiguanPC请求

贰、 Namenode会检查要开创的文书是不是曾经存在, 创制者是还是不是有权力进行操作,
成功则会为文件创造二个记下, 不然会让客户端抛出非凡;

三、 当客户端起来写入文件的时候, 客户端会将文件切分成三个packets,
并在内部以多少队列“data queue( 数据队列) ”的款式管理那一个packets,
并向Namenode申请blocks, 获取用来存款和储蓄replicas的适当的datanode列表,
列表的尺寸依据Namenode中replication的设定而定;

四、 伊始以pipeline( 管道) 的样式将packet写入全数的replicas中。
开发库把packet以流的不二等秘书诀写入第3个datanode,
该datanode把该packet存款和储蓄之后,
再将其传递给在此pipeline中的下一个datanode, 直到终极3个datanode,
那种写多少的法子呈流水生产线的样式。

五、 最后贰个datanode成功存款和储蓄之后会回去一个ack packet( 确认队列) ,
在pipeline里传递至客户端, 在客户端的开销库内部维护着”ack queue”,
成功接到datanode再次来到的ack packet后会从”ack queue”移除相应的packet。

陆、 要是传输进度中, 有有个别datanode出现了故障,
那么当前的pipeline会被关闭,
出现故障的datanode会从日前的pipeline中移除,
剩余的block会继续剩下的datanode中继承以pipeline的款式传输,
同时Namenode会分配1个新的datanode, 保持replicas设定的多少。

7、 客户端达成数据的写入后, 会对数码流动调查用close()方法, 关闭数据流;

八、 只要写入了dfs.replication.min的复本数( 暗中同意为一) ,
写操作就会成功, 并且那些块能够在集群中异步复制, 直到达到其指标复本数(
dfs. replication的暗许值为三) , 因为namenode已经知晓文书由什么块组成,
所以它在回去成功前只须求等待数据块进行最少量的复制。


流程表达:

 1     public boolean write(ThriftHandle tout, String data) throws ThriftIOException {
 2       try {
 3         now = now();
 4         HadoopThriftHandler.LOG.debug("write: " + tout.id);
 5         FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
 6         byte[] tmp = data.getBytes("UTF-8");
 7         out.write(tmp, 0, tmp.length);
 8         HadoopThriftHandler.LOG.debug("wrote: " + tout.id);
 9         return true;
10       } catch (IOException e) {
11         throw new ThriftIOException(e.getMessage());
12       }
13     }

6、示例程序

  上面是三个依据Thrift达成的HDFS客户端程序,实现了文本的造访和创制和读取

  葡萄娱乐场 1

流程表明:

1、HDFS文件读取流程

  1. 行使HDFS提供的客户端支付库Client,向远程的Namenode发起奥迪Q5PC请求;
  2. Namenode会视意况重返文件的有的照旧全部block列表,对于每一种block,Namenode都会再次回到有该block拷贝的DataNode地址;
  3. 客户端支出库Client会选用离客户端最相仿的DataNode来读取block;借使客户端本身正是DataNode,那么将从本土一向获取数据.
  4. 读取完当前block的数量后,关闭与眼下的DataNode连接,并为读取下一个block寻找最好的DataNode;
  5. 当读完列表的block后,且文件读取还不曾结束,客户端支付库会继续向Namenode获取下一堆的block列表。
  6. 读取完二个block都会展开checksum验证,倘若读取datanode时出现错误,客户端会布告Namenode,然后再从下二个全部该block拷贝的datanode继续读。

贰.ThriftServer调用HDFSClient接口API实现HDFS读写操作,操作流程如和三所示。

  ThriftHandle的两方到底是何人呢?是ThriftClient和DataNode?依然ThriftServer与DataNode?

 1     /**
 2       * Create a file and open it for writing, delete file if it exists
 3       */
 4     public ThriftHandle createFile(Pathname path, 
 5                                    short mode,
 6                                    boolean  overwrite,
 7                                    int bufferSize,
 8                                    short replication,
 9                                    long blockSize) throws ThriftIOException {
10       try {
11         now = now();
12         HadoopThriftHandler.LOG.debug("create: " + path +
13                                      " permission: " + mode +
14                                      " overwrite: " + overwrite +
15                                      " bufferSize: " + bufferSize +
16                                      " replication: " + replication +
17                                      " blockSize: " + blockSize);
18         FSDataOutputStream out = fs.create(new Path(path.pathname), 
19                                            new FsPermission(mode),
20                                            overwrite,
21                                            bufferSize,
22                                            replication,
23                                            blockSize,
24                                            null); // progress
25         long id = insert(out);
26         ThriftHandle obj = new ThriftHandle(id);
27         HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
28         return obj;
29       } catch (IOException e) {
30         throw new ThriftIOException(e.getMessage());
31       }
32     }

三、关键词

  HDFSClient通过文件IO操作最后完成是经过平昔访问DataNode实行。

  与DataNode产生数据沟通的到底是ThriftServer仍然ThriftClient,要是是ThriftServer,那么四个ThriftClient并行访问时,ThriftServer必将成为HDFS访问的性质瓶颈;假若是ThriftClient直接待上访问DataNode,那么理论根据何在呢?

  ThriftHandle能够当做是Socket连接句柄,可是他的双边到底是何人吧?假若是ThriftClient代表的客户端则壹切OK,那么小编该怎么着验证呢?存疑待考!

  1. 动用HDFS提供的客户端支出库Client,向远程的Namenode发起中华VPC请求;
  2. Namenode会检查要创制的文本是还是不是早已存在,创造者是不是有权力举行操作,成功则会为文件创制3个记下,不然会让客户端抛出十二分;

  3. 客户端初叶写入文件的时候,开发库会将文件切分成多少个packets,并在中间以多少队列”data
    queue”的款式管理这一个packets,并向Namenode申请新的blocks,获取用来存款和储蓄replicas的适度的datanodes列表,
    列表的分寸依照在Namenode中对replication的装置而定。
  4. 初步以pipeline(管道)的方式将packet写入全体的replicas中。开发库把packet以流的法子写入第四个datanode,该datanode把该packet存款和储蓄之后,再将其传递给在此
    pipeline中的下多少个datanode,直到最终一个datanode,那种写多少的主意呈流水生产线的样式。
  5. 谈到底一个datanode成功存款和储蓄之后会回来1个ack
    packet,在pipeline里传递至客户端,在客户端的支付库内部维护着”ack
    queue”,成功接到datanode再次回到的ack packet后会从”ack
    queue”移除相应的packet。
  6. 要是传输进度中,有有个别datanode出现了故障,那么当前的pipeline会被关闭,出现故障的datanode会从当下的pipeline中移除,
    剩余的block会继续剩下的datanode中三番五次以pipeline的花样传输,同时Namenode会分配三个新的datanode,保持
    replicas设定的多寡。

2、HDFS文件写入流程

流程表达:

  Hadoop提供的HDFS布式文件存款和储蓄系统,提供了根据thrift的客户端访问帮忙,可是因为Thrift自己的走访特点,在高并发的走访景况下,thrift自己结构或者将会化为HDFS文件存款和储蓄系统的二个属性瓶颈。大家先来看一下壹不应用Thrfit格局访问HDFS文件系统的业务流程。

五、疑问

  叁.文书读取

  1.文件创制:

 7、源码分析

葡萄娱乐场 2

  1 // HdfsDemo.cpp : Defines the entry point for the console application.
  2 //
  3 
  4 #include "stdafx.h"
  5 #include <iostream>
  6 #include <string>
  7 #include <boost/lexical_cast.hpp>
  8 #include <protocol/TBinaryProtocol.h>
  9 #include <transport/TSocket.h>
 10 #include <transport/TTransportUtils.h>
 11 #include "ThriftHadoopFileSystem.h"
 12 
 13 #ifndef _WIN32_WINNT
 14 #define _WIN32_WINNT 0x0500
 15 #endif 
 16 using namespace std;
 17 using namespace apache::thrift;
 18 using namespace apache::thrift::protocol;
 19 using namespace apache::thrift::transport;
 20 
 21 int _tmain(int argc, _TCHAR* argv[])
 22 {
 23     if (argc < 3) 
 24     {
 25         std::cerr << "Invalid arguments!\n" << "Usage: DemoClient host port" << std::endl;
 26         //return -1;
 27     }
 28     boost::shared_ptr<TTransport> socket(new TSocket("192.168.230.133", 55952));//boost::lexical_cast<int>(argv[2])));
 29     boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
 30     boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
 31     ThriftHadoopFileSystemClient client(protocol);
 32     try 
 33     {
 34         transport->open();
 35         Pathname path;
 36         //01_create directory
 37         path.__set_pathname("/user/hadoop");
 38         if(client.exists(path) == true)
 39         {
 40             printf("path is exists.\r\n");
 41         }
 42         else
 43         {
 44             printf("path is not exists.");
 45             //return 0;
 46         }
 47         //02_put file
 48         Pathname filepath;
 49         filepath.__set_pathname("/user/hadoop/in/test1.txt");
 50         /*
 51         FILE* localfile = fopen("E:\\project\\Hadoop\\HdfsDemo\\Debug\\hello.txt","rb");
 52         if (localfile == NULL)
 53         {
 54             transport->close();
 55             return 0;
 56         }
 57         ThriftHandle hdl;
 58         client.create(hdl,filepath);
 59         while (true)
 60         {
 61             char data[1024];
 62             memset(data,0x00,sizeof(data));
 63             size_t Num = fread(data,1,1024,localfile);
 64             if (Num <= 0)
 65             {
 66                 break;
 67             }
 68             client.write(hdl,data);
 69         }
 70         fclose(localfile);
 71         client.close(hdl);
 72         */
 73         //03_get file
 74         /*
 75         ThriftHandle hd2;
 76         FileStatus stat1;
 77         client.open(hd2,filepath);
 78         client.stat(stat1,filepath);
 79         int index = 0;
 80         while(true)
 81         {
 82             string data;
 83             if (stat1.length <= index)
 84             {
 85                 break;
 86             }
 87             client.read(data,hd2,index,1024);
 88 
 89             index += data.length();
 90             printf("==%s\r\n",data.c_str());
 91         }
 92         client.close(hd2);
 93         */
 94 
 95         //04_list files
 96         std::vector<FileStatus> vFileStatus;
 97         client.listStatus(vFileStatus,path);
 98         for (int i=0;i<vFileStatus.size();i++)
 99         {
100             printf("i=%d file=%s\r\n",i,vFileStatus[i].path.c_str());
101         }
102         transport->close();
103     } catch (const TException &tx) {
104     std::cerr << "ERROR: " << tx.what() << std::endl;
105     }
106     getchar();
107     return 0;
108 }

 

葡萄娱乐场 3

 捌、遗留难题

引言

 1     /**
 2      * read from a file
 3      */
 4     public String read(ThriftHandle tout, long offset,
 5                        int length) throws ThriftIOException {
 6       try {
 7         now = now();
 8         HadoopThriftHandler.LOG.debug("read: " + tout.id +
 9                                      " offset: " + offset +
10                                      " length: " + length);
11         FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
12         if (in.getPos() != offset) {
13           in.seek(offset);
14         }
15         byte[] tmp = new byte[length];
16         int numbytes = in.read(offset, tmp, 0, length);
17         HadoopThriftHandler.LOG.debug("read done: " + tout.id);
18         return new String(tmp, 0, numbytes, "UTF-8");
19       } catch (IOException e) {
20         throw new ThriftIOException(e.getMessage());
21       }
22     }

1.ThriftClient客户端将操作命令传给ThriftServer。

  葡萄娱乐场 4

  写入时依赖的依旧ThriftHandle?  

  二.文件写入