溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

HDFS中怎么實現(xiàn)本地文件上傳

發(fā)布時間:2021-06-26 14:29:37 來源:億速云 閱讀:737 作者:Leah 欄目:云計算

本篇文章給大家分享的是有關(guān)HDFS中怎么實現(xiàn)本地文件上傳,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

public synchronized void write(byte b[], int off, int len)

        throws IOException {

            if (closed) { //校驗是否關(guān)閉了,關(guān)閉了自然不應(yīng)該再寫入數(shù)據(jù)了

                throw new IOException("Stream closed");

            }

            while (len > 0) { //這里的len就是指源緩沖區(qū)剩下的未寫完的數(shù)據(jù)長度,單位byte

              int remaining = BUFFER_SIZE - pos; //目的緩沖區(qū)里可以寫的字節(jié)數(shù)

              int toWrite = Math.min(remaining, len); //跟需要寫的字節(jié)數(shù)比較,取較小值作為真正要寫入的字節(jié)數(shù)

              System.arraycopy(b, off, outBuf, pos, toWrite); //開始復(fù)制來作為寫入到目的緩沖區(qū)操作

              pos += toWrite; //更新目的緩沖區(qū)位置指針

              off += toWrite; //更新源緩沖區(qū)位置指針

              len -= toWrite; //更新源緩沖區(qū)剩下的內(nèi)容長度

              filePos += toWrite; //計算整個文件的總的已經(jīng)寫入的長度(包括緩沖區(qū)里的內(nèi)容)

              if ((bytesWrittenToBlock + pos >= BLOCK_SIZE) ||

                  (pos == BUFFER_SIZE)) {

                flush(); //這里是2個條件引起flush,一個是總長度(已寫+緩存)超過一個塊大小,

                                              //第2個就是目的緩沖區(qū)已經(jīng)滿了,都么空間寫入了,自然需要flush了。

              }

            }

        }

 //友情提醒,這里的前半段寫入是能寫多少寫多少,寫完了再判斷!

為啥有2個判斷條件?想必很多人對緩沖區(qū)滿了很好理解,因為都沒剩余空間了

而對bytesWrittenToBlock + pos >= BLOCK_SIZE可能不是很清楚

這是因為一個Block寫滿了就要另起爐灶,重新開一個Block.

flush()函數(shù)暫時不解釋,后面再解釋!

---

 public synchronized void write(int b) throws IOException {

            if (closed) {//仍然是校驗是否關(guān)閉

                throw new IOException("Stream closed");

            }

            if ((bytesWrittenToBlock + pos == BLOCK_SIZE) ||

                (pos >= BUFFER_SIZE)) {

                flush();

            }//仍然是2個條件的校驗

            outBuf[pos++] = (byte) b;

            filePos++;//這2句的意義在于真正的寫入到目的緩沖區(qū)里

                         不過為啥不把這2段調(diào)一下順序更好理解?果然思維獨特!

        }

---

 public synchronized void flush() throws IOException {

            if (closed) {

                throw new IOException("Stream closed");

            }//檢驗是否關(guān)閉,老規(guī)矩

            if (bytesWrittenToBlock + pos >= BLOCK_SIZE) {

                flushData(BLOCK_SIZE - bytesWrittenToBlock);

            }//如果需要新起1個Block的話,就把剩下的不足字節(jié)數(shù)先寫上

            if (bytesWrittenToBlock == BLOCK_SIZE) {

                endBlock();//然后關(guān)閉當(dāng)前塊,新起一塊

            }

            flushData(pos);//對當(dāng)前塊繼續(xù)寫剩下的

        } 

---

繼續(xù)看別的函數(shù)

在看別的函數(shù)之前,首先希望讀者先建立一個0.1.0中文件的存儲機制。

在讀取本地文件上傳到HDFS中,文件流是這樣的。

本地文件--->本地內(nèi)存緩沖區(qū)Buffer--->本地文件--->上傳到遠(yuǎn)程HDFS系統(tǒng)。

而本地內(nèi)存緩沖區(qū)Buffer--->本地文件就是flushData做的事情,請再復(fù)習(xí)下flush函數(shù),然后再接下來分析flushData.

PS:看代碼比寫代碼累,看代碼是了解別人的思維,寫代碼是把自己的思維實現(xiàn)起來。。。 

private synchronized void flushData(int maxPos) throws IOException {

            int workingPos = Math.min(pos, maxPos);//計算要寫入的字節(jié)數(shù),真是多此一舉。

            if (workingPos > 0) {//如果確實需要寫的話

                //

                // To the local block backup, write just the bytes

                //

                backupStream.write(outBuf, 0, workingPos);//寫入到本地文件

                //注意,請認(rèn)真閱讀backupStream的初始化過程,是一個本地文件。

                //也就是說計劃把內(nèi)存緩沖區(qū)里的內(nèi)容寫到本地文件中,寫完一個block再發(fā)送給HDFS.

                //聰明的讀者應(yīng)該想到最后一個block的大小是<=blockSize的。

                // Track position

                //

                bytesWrittenToBlock += workingPos;//更新寫入到block塊的字節(jié)數(shù),

                //尤其要強調(diào),當(dāng)一個塊結(jié)束后,這個變量就會重置為0,你懂的。

                System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);

                //字節(jié)前挪移到偏移量為0的位置,方便后面IO操作,你懂得,不解釋。

                pos -= workingPos;//相關(guān)變量都需要更新

            }

        }

---------------

接下來到了比較核心的函數(shù)endBlock(); 意思是關(guān)閉當(dāng)前塊,新起一塊,下面來看看具體的代碼!

private synchronized void endBlock() throws IOException {

            //

            // Done with local copy

            //

            backupStream.close();//關(guān)閉本地文件系統(tǒng)的臨時文件

            //

            // Send it to datanode//準(zhǔn)備發(fā)送給datanode了。

            //

            boolean mustRecover = true;//定義一個哨兵變量

            while (mustRecover) {//需要讀取當(dāng)前文件時

                nextBlockOutputStream();

           因為這個函數(shù)到后面才分析,所以提把背景知識補充好,這個函數(shù)

           主要是初始化了一對IO流句柄,這個流是當(dāng)前shell和遠(yuǎn)程datanode

           之間的TCP連接,這對IO流句柄就是 blockStream + blockReplyStream,

           分別對應(yīng)著輸出流和輸入流,輸出流用來輸出文件頭和文件內(nèi)容,輸入流是

           用來讀取響應(yīng)。 

                InputStream in = new FileInputStream(backupFile);//既然第一行關(guān)閉了寫,

                現(xiàn)在就可以開始讀了

                try {

                    byte buf[] = new byte[BUFFER_SIZE];//還是局部的IO緩沖區(qū)

                    int bytesRead = in.read(buf);//從本地文件中讀取內(nèi)容

                    while (bytesRead > 0) {//大于0?

                        blockStream.writeLong((long) bytesRead);//寫入字節(jié)數(shù)

                        blockStream.write(buf, 0, bytesRead);//寫入緩沖區(qū)的內(nèi)容

                        bytesRead = in.read(buf);//繼續(xù)從本地文件中讀取

                    }

                    internalClose();//跟NameNode和DataNode的交互,表示關(guān)閉

                    mustRecover = false;//表示任務(wù)結(jié)束

                } catch (IOException ie) {

                    handleSocketException(ie);

                } finally {

                  in.close();//關(guān)閉當(dāng)前文件的輸入流

                }

            }

            //

            // Delete local backup, start new one

            //下面4行是從新建立起本地文件系統(tǒng)的文件緩沖系統(tǒng),不解釋

            backupFile.delete();

            backupFile = newBackupFile();

            backupStream = new FileOutputStream(backupFile);

            bytesWrittenToBlock = 0;

        }

在閱讀以上代碼之后,我個人認(rèn)為如果用C語言來寫這段邏輯的話,我會直接調(diào)用sendfile來實現(xiàn)文件傳輸。

當(dāng)然JAVA的API滯后性以及OS當(dāng)時或許都不提供這種方式吧,反正現(xiàn)在的內(nèi)核都提供了。

---------------------------------------

 那么接下來分析的是函數(shù):nextBlockOutputStream()

private synchronized void nextBlockOutputStream() throws IOException {

            boolean retry = false;//不解釋

            long start = System.currentTimeMillis();//當(dāng)前開始時間

            do {

                retry = false;//重置為false 

                long localstart = System.currentTimeMillis();//當(dāng)前開始時間

                boolean blockComplete = false;//標(biāo)注塊是否OK

                LocatedBlock lb = null;    //初始化為null          

                while (! blockComplete) {//如果未結(jié)束

                    if (firstTime) {//如果是第一次開啟一個文件

                        lb = namenode.create(src.toString(), clientName.toString(), localName, overwrite);//創(chuàng)建一個文件 

                    } else {

                        lb = namenode.addBlock(src.toString(), localName);

                    }//增加一個block

                    if (lb == null) {//如果找不到

                        try {

                            Thread.sleep(400);//就沉睡400毫秒

                            if (System.currentTimeMillis() - localstart > 5000) {

                                LOG.info("Waiting to find new output block node for " + (System.currentTimeMillis() - start) + "ms");

                            }

                        } catch (InterruptedException ie) {

                        }

                    } else {

                        blockComplete = true;//設(shè)置blockComplete為true.解釋為找到了一個block

                    }

                }

                block = lb.getBlock();//從lb中獲取block的信息

                DatanodeInfo nodes[] = lb.getLocations();//從lb中獲取block要存儲的DataNode數(shù)組

                //

                // Connect to first DataNode in the list.  Abort if this fails.

                //請注意上面這句的意思:連接第一個數(shù)據(jù)節(jié)點,

                //為啥?數(shù)據(jù)傳輸采用計算機組成原理的菊花鏈模式

                InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName().toString());//解析

                try {

                    s = new Socket();

                    s.connect(target, READ_TIMEOUT);//連接第一個DataNode

                    s.setSoTimeout(READ_TIMEOUT);//設(shè)置讀取時間

                } catch (IOException ie) {//異常這里就不分析了

                    // Connection failed.  Let's wait a little bit and retry

                    try {

                        if (System.currentTimeMillis() - start > 5000) {

                            LOG.info("Waiting to find target node: " + target);

                        }

                        Thread.sleep(6000);

                    } catch (InterruptedException iex) {

                    }

                    if (firstTime) {

                        namenode.abandonFileInProgress(src.toString());

                    } else {

                        namenode.abandonBlock(block, src.toString());

                    }

                    retry = true;

                    continue;

                }

                //此時已經(jīng)成功連接到了遠(yuǎn)程DataNode節(jié)點,bingo!

                // Xmit header info to datanode

                //

                DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));

//獲取輸出流句柄

                out.write(OP_WRITE_BLOCK);//輸出行為標(biāo)識

                out.writeBoolean(false);//false?

                block.write(out);//寫入block信息,注意:是把從namenode獲取到的block寫給DataNode

                out.writeInt(nodes.length);//這一樣和下面這一行是為了寫入所有存儲及備份的DataNode

                for (int i = 0; i < nodes.length; i++) {

                    nodes[i].write(out);//不解釋

                }

                out.write(CHUNKED_ENCODING);//寫CHUNKED_ENCODING

                bytesWrittenToBlock = 0;//重置為0

                blockStream = out;//把句柄賦值給類的局部變量供后續(xù)使用

                blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));//同理,不解釋

            } while (retry);

            firstTime = false;//firstTime在至少有一個塊信息返回后就為false

=================================================== 

接下來要分析的函數(shù)是

private synchronized void internalClose() throws IOException {

            blockStream.writeLong(0);//表明長度結(jié)束了

            blockStream.flush();//把緩沖內(nèi)容全部輸出。

            long complete = blockReplyStream.readLong();//讀取響應(yīng)

            if (complete != WRITE_COMPLETE) {//如果不是結(jié)束

                LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);

                throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);

            }

            LocatedBlock lb = new LocatedBlock();//創(chuàng)建一個新對象

            lb.readFields(blockReplyStream);//根據(jù)響應(yīng)流來賦值

            namenode.reportWrittenBlock(lb);//向namenode報告寫入成功

            s.close();//關(guān)閉此流

            s = null;

        }

================

最后就是close函數(shù)

public synchronized void close() throws IOException {

            if (closed) {

                throw new IOException("Stream closed");

            }//校驗是否關(guān)閉了

            flush();//盡可能的輸出內(nèi)容

            if (filePos == 0 || bytesWrittenToBlock != 0) {

              try {

                endBlock();//結(jié)束一個塊

              } catch (IOException e) {

                namenode.abandonFileInProgress(src.toString());//拋棄此file

                throw e;

              }

            }

            backupStream.close();//關(guān)閉流

            backupFile.delete();//刪除文件

            if (s != null) {

                s.close();//不解釋

                s = null;

            }

            super.close();

            long localstart = System.currentTimeMillis();

            boolean fileComplete = false;

            while (! fileComplete) {//循環(huán)報告文件寫完了

                fileComplete = namenode.complete(src.toString(), clientName.toString());

                if (!fileComplete) {

                    try {

                        Thread.sleep(400);

                        if (System.currentTimeMillis() - localstart > 5000) {

                            LOG.info("Could not complete file, retrying...");

                        }

                    } catch (InterruptedException ie) {

                    }

                }

            }

            closed = true;

        }

以上就是HDFS中怎么實現(xiàn)本地文件上傳,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI