溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

java實現(xiàn)memcache服務器的示例代碼

發(fā)布時間:2020-09-18 01:58:52 來源:腳本之家 閱讀:173 作者:raledong 欄目:編程語言

什么是Memcache?

Memcache集群環(huán)境下緩存解決方案

Memcache是一個高性能的分布式的內存對象緩存系統(tǒng),通過在內存里維護一個統(tǒng)一的巨大的hash表,它能夠用來存儲各種格式的數(shù)據(jù),包括圖像、視頻、文件以及數(shù)據(jù)庫檢索的結果等。簡單的說就是將數(shù)據(jù)調用到內存中,然后從內存中讀取,從而大大提高讀取速度?! ?br />

 Memcache是danga的一個項目,最早是LiveJournal 服務的,最初為了加速 LiveJournal 訪問速度而開發(fā)的,后來被很多大型的網(wǎng)站采用。  

Memcached是以守護程序方式運行于一個或多個服務器中,隨時會接收客戶端的連接和操作

為什么會有Memcache和memcached兩種名稱?

其實Memcache是這個項目的名稱,而memcached是它服務器端的主程序文件名,知道我的意思了吧。一個是項目名稱,一個是主程序文件名,在網(wǎng)上看到了很多人不明白,于是混用了。 

Memcached是高性能的,分布式的內存對象緩存系統(tǒng),用于在動態(tài)應用中減少數(shù)據(jù)庫負載,提升訪問速度。Memcached由Danga Interactive開發(fā),用于提升LiveJournal.com訪問速度的。LJ每秒動態(tài)頁面訪問量幾千次,用戶700萬。Memcached將數(shù)據(jù)庫負載大幅度降低,更好的分配資源,更快速訪問。

這篇文章將會涉及以下內容:

  1. Java Socket多線程服務器
  2. Java IO
  3. Concurrency
  4. Memcache特性和協(xié)議

Memcache

Memcache is an in-memory key-value store for small chunks of arbitrary data (strings, objects) from results of databasecalls, API calls, or page rendering.

即內存緩存數(shù)據(jù)庫,是一個鍵值對數(shù)據(jù)庫。該數(shù)據(jù)庫的存在是為了將從其他服務中獲取的數(shù)據(jù)暫存在內存中,在重復訪問時可以直接從命中的緩存中返回。既加快了訪問速率,也減少了其他服務的負載。這里將實現(xiàn)一個單服務器版本的Memcache,并且支持多個客戶端的同時連接。

客戶端將與服務器建立telnet連接,然后按照Memcache協(xié)議與服務器緩存進行交互。這里實現(xiàn)的指令為get,set和del。先來看一下各個指令的格式

set

set屬于存儲指令,存儲指令的特點時,第一行輸入基本信息,第二行輸入其對應的value值。

set <key> <flags> <exptime> <bytes> [noreply]\r\n
<value>\r\n

如果存儲成功,將會返回STORED,如果指令中包含noreply屬性,則服務器將不會返回信息。

該指令中每個域的內容如下:

  1. key: 鍵
  2. flags: 16位無符號整數(shù),會在get時隨鍵值對返回
  3. exptime: 過期時間,以秒為單位
  4. bytes:即將發(fā)送的value的長度
  5. noreply:是否需要服務器響應,為可選屬性

如果指令不符合標準,服務器將會返回ERROR。

get

get屬于獲取指令,該指令特點如下:

get <key>*\r\n

它支持傳入多個key的值,如果緩存命中了一個或者多個key,則會返回相應的數(shù)據(jù),并以END作為結尾。如果沒有命中,則返回的消息中不包含該key對應的值。格式如下:

VALUE <key> <flags> <bytes>\r\n
<data block>\r\n
VALUE <key> <flags> <bytes>\r\n
<data block>\r\n
END
del

刪除指令,該指令格式如下:

del <key> [noreply]\r\n

如果刪除成功,則返回DELETED\r\n,否則返回NOT_FOUND。如果有noreply參數(shù),則服務器不會返回響應。

JAVA SOCKET

JAVA SOCKET需要了解的只是包括TCP協(xié)議,套接字,以及IO流。這里就不詳細贅述,可以參考我的這系列文章,也建議去閱讀JAVA Network Programming。一書。

代碼實現(xiàn)

這里貼圖功能出了點問題,可以去文末我的項目地址查看類圖。

這里采用了指令模式和工廠模式實現(xiàn)指令的創(chuàng)建和執(zhí)行的解耦。指令工廠將會接收commandLine并且返回一個Command實例。每一個Command都擁有execute方法用來執(zhí)行各自獨特的操作。這里只貼上del指令的特殊實現(xiàn)。

 /**
 * 各種指令
 * 目前支持get,set,delete
 *
 * 以及自定義的
 * error,end
 */
public interface Command {

  /**
   * 執(zhí)行指令
   * @param reader
   * @param writer
   */
  void execute(Reader reader, Writer writer);

  /**
   * 獲取指令的類型
   * @return
   */
  CommandType getType();
}
/**
 * 指令工廠 單一實例
 */
public class CommandFactory {

  private static CommandFactory commandFactory;
  private static Cache<Item> memcache;
  private CommandFactory(){}

  public static CommandFactory getInstance(Cache<Item> cache) {
    if (commandFactory == null) {
      commandFactory = new CommandFactory();
      memcache = cache;
    }
    return commandFactory;
  }

  /**
   * 根據(jù)指令的類型獲取Command
   * @param commandLine
   * @return
   */
  public Command getCommand(String commandLine){
    if (commandLine.matches("^set .*$")){
      return new SetCommand(commandLine, memcache);
    }else if (commandLine.matches("^get .*$")){
      return new GetCommand(commandLine, memcache);
    }else if (commandLine.matches("^del .*$")){
      return new DeleteCommand(commandLine, memcache);
    }else if (commandLine.matches("^end$")){
      return new EndCommand(commandLine);
    }else{
      return new ErrorCommand(commandLine, ErrorCommand.ErrorType.ERROR);
    }
  }
}

/**
 * 刪除緩存指令
 */
public class DeleteCommand implements Command{

  private final String command;
  private final Cache<Item> cache;

  private String key;
  private boolean noReply;
  public DeleteCommand(final String command, final Cache<Item> cache){
    this.command = command;
    this.cache = cache;
    initCommand();
  }

  private void initCommand(){
    if (this.command.contains("noreply")){
      noReply = true;
    }
    String[] info = command.split(" ");
    key = info[1];
  }

  @Override
  public void execute(Reader reader, Writer writer) {
    BufferedWriter bfw = (BufferedWriter) writer;
    Item item = cache.delete(key);
    if (!noReply){
      try {
        if (item == null){
          bfw.write("NOT_FOUND\r\n");
        }else {
          bfw.write("DELETED\r\n");
        }
        bfw.flush();
      } catch (IOException e) {
        try {
          bfw.write("ERROR\r\n");
          bfw.flush();
        } catch (IOException e1) {
          e1.printStackTrace();
        }
        e.printStackTrace();
      }
    }


  }

  @Override
  public CommandType getType() {
    return CommandType.SEARCH;
  }
}

然后是實現(xiàn)內存服務器,為了支持先進先出功能,這里使用了LinkedTreeMap作為底層實現(xiàn),并且重寫了removeOldest方法。同時還使用CacheManager的后臺線程及時清除過期的緩存條目。

public class Memcache implements Cache<Item>{
  private Logger logger = Logger.getLogger(Memcache.class.getName());
  //利用LinkedHashMap實現(xiàn)LRU
  private static LinkedHashMap<String, Item> cache;
  private final int maxSize;
  //負載因子
  private final float DEFAULT_LOAD_FACTOR = 0.75f;
  public Memcache(final int maxSize){
    this.maxSize = maxSize;
    //確保cache不會在達到maxSize之后自動擴容
    int capacity = (int) Math.ceil(maxSize / DEFAULT_LOAD_FACTOR) + 1;

    this.cache = new LinkedHashMap<String, Item>(capacity, DEFAULT_LOAD_FACTOR, true){
      @Override
      protected boolean removeEldestEntry(Map.Entry<String,Item> eldest) {
        if (size() > maxSize){
          logger.info("緩存數(shù)量已經(jīng)達到上限,會刪除最近最少使用的條目");
        }
        return size() > maxSize;
      }
    };

    //實現(xiàn)同步訪問
    Collections.synchronizedMap(cache);
  }

  public synchronized boolean isFull(){
    return cache.size() >= maxSize;
  }

  @Override
  public Item get(String key) {
    Item item = cache.get(key);

    if (item == null){
      logger.info("緩存中key:" + key + "不存在");
      return null;
    }else if(item!=null && item.isExpired()){ //如果緩存過期則刪除并返回null
      logger.info("從緩存中讀取key:" + key + " value:" + item.getValue() + "已經(jīng)失效");
      cache.remove(key);
      return null;
    }

    logger.info("從緩存中讀取key:" + key + " value:" + item.getValue() + " 剩余有效時間" + item.remainTime());
    return item;
  }

  @Override
  public void set(String key, Item value) {
    logger.info("向緩存中寫入key:" + key + " value:" + value);
    cache.put(key, value);
  }

  @Override
  public Item delete(String key) {
    logger.info("從緩存中刪除key:" + key);
    return cache.remove(key);
  }

  @Override
  public int size(){
    return cache.size();
  }

  @Override
  public int capacity() {
    return maxSize;
  }

  @Override
  public Iterator<Map.Entry<String, Item>> iterator() {
    return cache.entrySet().iterator();
  }
}
/**
 * 緩存管理器
 * 后臺線程
 * 將cache中過期的緩存刪除
 */
public class CacheManager implements Runnable {

  private Logger logger = Logger.getLogger(CacheManager.class.getName());

  //緩存
  public Cache<Item> cache;

  public CacheManager(Cache<Item> cache){
    this.cache = cache;
  }


  @Override
  public void run() {
    while (true){
      Iterator<Map.Entry<String, Item>> itemIterator = cache.iterator();
      while (itemIterator.hasNext()){
        Map.Entry<String, Item> entry = itemIterator.next();
        Item item = entry.getValue();
        if(item.isExpired()){
          logger.info("key:" + entry.getKey() + " value" + item.getValue() + " 已經(jīng)過期,從數(shù)據(jù)庫中刪除");
          itemIterator.remove();
        }
      }

      try {
        //每隔5秒鐘再運行該后臺程序
        TimeUnit.SECONDS.sleep(5);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

    }
  }
}

最后是實現(xiàn)一個多線程的Socket服務器,這里就是將ServerSocket綁定到一個接口,并且將accept到的Socket交給額外的線程處理。

/**
 * 服務器
 */
public class IOServer implements Server {
  private boolean stop;
  //端口號
  private final int port;
  //服務器線程
  private ServerSocket serverSocket;
  private final Logger logger = Logger.getLogger(IOServer.class.getName());
  //線程池,線程容量為maxConnection
  private final ExecutorService executorService;
  private final Cache<Item> cache;
  public IOServer(int port, int maxConnection, Cache<Item> cache){
    if (maxConnection<=0) throw new IllegalArgumentException("支持的最大連接數(shù)量必須為正整數(shù)");
    this.port = port;
    executorService = Executors.newFixedThreadPool(maxConnection);
    this.cache = cache;
  }

  @Override
  public void start() {
    try {
      serverSocket = new ServerSocket(port);
      logger.info("服務器在端口"+port+"上啟動");
      while (true){
        try {
          Socket socket = serverSocket.accept();
          logger.info("收到"+socket.getLocalAddress()+"的連接");
          executorService.submit(new SocketHandler(socket, cache));
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    } catch (IOException e) {
      logger.log(Level.WARNING, "服務器即將關閉...");
      e.printStackTrace();
    } finally {
      executorService.shutdown();
      shutDown();
    }


  }

  /**
   * 服務器是否仍在運行
   * @return
   */
  public boolean isRunning() {
    return !serverSocket.isClosed();
  }
  /**
   * 停止服務器
   */
  public void shutDown(){
    try {
      if (serverSocket!=null){
        serverSocket.close();
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}
/**
 * 處理各個客戶端的連接
 * 在獲得end指令后關閉連接s
 */
public class SocketHandler implements Runnable{

  private static Logger logger = Logger.getLogger(SocketHandler.class.getName());

  private final Socket socket;

  private final Cache<Item> cache;

  private boolean finish;


  public SocketHandler(Socket s, Cache<Item> cache){
    this.socket = s;
    this.cache = cache;
  }

  @Override
  public void run() {
    try {
      //獲取socket輸入流
      final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
      //獲取socket輸出流
      final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));

      CommandFactory commandFactory = CommandFactory.getInstance(cache);

      while (!finish){
        final String commandLine = reader.readLine();
        logger.info("ip:" + socket.getLocalAddress() + " 指令:" + commandLine);

        if (commandLine == null || commandLine.trim().isEmpty()) {
          continue;
        }

        //使用指令工廠獲取指令實例
        final Command command = commandFactory.getCommand(commandLine);
        command.execute(reader, writer);

        if (command.getType() == CommandType.END){
          logger.info("請求關閉連接");
          finish = true;
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
      logger.info("關閉來自" + socket.getLocalAddress() + "的連接");
    } finally {
      try {
        if (socket != null){
          socket.close();
        }
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
}

項目地址請戳這里,如果覺得還不錯的話,希望能給個星哈><

參考資料

memcached官網(wǎng)
memcache協(xié)議

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。

AI