程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> 分治和hash-從海量數據大文件中查出某時間段內登陸超過阈值的ip地址

分治和hash-從海量數據大文件中查出某時間段內登陸超過阈值的ip地址

編輯:C++入門知識

一個很大的文件,例如10G,僅包含ip地址和訪問時間二列,格式如下:


 

 127.0.0.1   2013-07-22 14:00 
127.0.0.1   2013-07-22 14:02 
127.0.0.1   2013-07-22 14:03 
127.0.0.3   2013-07-22 14:03 
127.0.0.1   2013-07-22 14:04 
127.0.0.1   2013-07-22 14:05 
127.0.0.1   2013-07-22 14:06 
127.0.0.1   2013-07-22 14:07 
127.0.0.1   2013-07-22 14:08 
127.0.0.1   2013-07-22 14:09 
127.0.0.1   2013-07-22 14:10 
127.0.0.1   2013-07-22 14:11 
127.0.0.1   2013-07-22 14:12 
127.0.0.1   2013-07-22 14:13 
127.0.0.4   2013-07-22 14:13 
127.0.0.1   2013-07-22 14:15 
127.0.0.1   2013-07-22 14:16 
127.0.0.4   2013-07-22 14:17 
... ... 

127.0.0.1   2013-07-22 14:00
127.0.0.1   2013-07-22 14:02
127.0.0.1   2013-07-22 14:03
127.0.0.3   2013-07-22 14:03
127.0.0.1   2013-07-22 14:04
127.0.0.1   2013-07-22 14:05
127.0.0.1   2013-07-22 14:06
127.0.0.1   2013-07-22 14:07
127.0.0.1   2013-07-22 14:08
127.0.0.1   2013-07-22 14:09
127.0.0.1   2013-07-22 14:10
127.0.0.1   2013-07-22 14:11
127.0.0.1   2013-07-22 14:12
127.0.0.1   2013-07-22 14:13
127.0.0.4   2013-07-22 14:13
127.0.0.1   2013-07-22 14:15
127.0.0.1   2013-07-22 14:16
127.0.0.4   2013-07-22 14:17
... ...

從文件裡查出在5分鐘內連續登陸10次以上的ip地址集合並輸出。這類問題是一個很常見的應用,通常都是從大的log日志文件中找出有攻擊嫌疑的ip。

這類應用因為要處理分析的文件非常大,顯然不能將整個文件全部讀入內存,然後進行下一步工作。常見的比較成熟的解決方案有:分治+Hash,Bloom filter,2-Bitmap等。 這裡就使用第一種方式來解決。
下面是分治與hash的代碼


 

 public class DuplicateIP { 
    private String delimiter = " "; 
    private String FILE_PRE = "ip_"; 
     
    private int MAGIC = 10,BATCH_MAGIC = 500; 
    private String root = "/DuplicateIP/"; 
     
    private String filename = ""; 
     
    public DuplicateIP(final String filename) { 
        this.filename = filename; 
    } 
     
    /**
     * 將大文件拆分成較小的文件,進行預處理
     * @throws IOException
     */ 
    private void preProcess() throws IOException { 
        //Path newfile = FileSystems.getDefault().getPath(filename);  
        BufferedInputStream fis = new BufferedInputStream(new FileInputStream(new File(filename)));          
        // 用5M的緩沖讀取文本文件  
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),5*1024*1024); 
         
        //假設文件是10G,那麼先根據hashcode拆成小文件,再進行讀寫判斷  
        //如果不拆分文件,將ip地址當做key,訪問時間當做value存到hashmap時,  
        //當來訪的ip地址足夠多的情況下,內存開銷吃不消  
//      List<Entity> entities = new ArrayList<Entity>();  
         
        //存放ip的hashcode->accessTimes集合  
        Map<String,List<String>> hashcodeMap = new HashMap<String,List<String>>(); 
        String line = ""; 
        int count = 0; 
        while((line = reader.readLine()) != null){ 
            String split[] = line.split(delimiter); 
            if(split != null && split.length >= 2){ 
                //根據ip的hashcode這樣拆分文件,拆分後的文件大小在1G上下波動  
                //極端情況是整個文件的ip地址全都相同,只有一個,那麼拆分後還是只有一個文件  
                int serial = split[0].trim().hashCode() % MAGIC; 
 
                String splitFilename = FILE_PRE + serial; 
                List<String> lines = hashcodeMap.get(splitFilename); 
                if(lines == null){ 
                    lines = new ArrayList<String>(); 
                     
                    hashcodeMap.put(splitFilename, lines); 
                } 
                lines.add(line); 
            } 
             
            count ++; 
            if(count > 0 && count % BATCH_MAGIC == 0){ 
                for(Map.Entry<String, List<String>> entry : hashcodeMap.entrySet()){   
                    //System.out.println(entry.getKey()+"--->"+entry.getValue());  
                    DuplicateUtils.appendFile(root + entry.getKey(), entry.getValue(), Charset.forName("UTF-8"));    
                }   
                //一次操作500之後清空,重新執行  
                hashcodeMap.clear(); 
            } 
        } 
         
        reader.close(); 
        fis.close(); 
    } 
     
    private boolean process() throws IOException{ 
        Path target = Paths.get(root); 
         
        //ip -> List<Date>  
        Map<String,List<Date>> resMap = new HashMap<String,List<Date>>(); 
        this.recurseFile(target,resMap); 
         
        for(Map.Entry<String, List<Date>> entry : resMap.entrySet()){ 
            System.out.println(entry.getKey()); 
            for(Date date : entry.getValue()){ 
                System.out.println(date); 
            }            
        } 
         
        return true; 
    } 
     
    /**
     * 遞歸執行,將5分鐘內訪問超過阈值的ip找出來
     * 
     * @param parent
     * @return
     * @throws IOException 
     */ 
    private void recurseFile(Path parent,Map<String,List<Date>> resMap) throws IOException{ 
        //Path target = Paths.get(dir);  
        if(!Files.exists(parent) || !Files.isDirectory(parent)){ 
            return; 
        } 
         
        Iterator<Path> targets = parent.iterator(); 
        for(;targets.hasNext();){ 
            Path path = targets.next(); 
            if(Files.isDirectory(parent)){ 
                //如果還是目錄,遞歸  
                recurseFile(path.toAbsolutePath(),resMap); 
            }else { 
                //將一個文件中所有的行讀上來  
                List<String> lines = Files.readAllLines(path, Charset.forName("UTF-8")); 
                judgeAndcollection(lines,resMap); 
            }            
        } 
    } 
     
    /**
     * 根據從較小文件讀上來的每行ip accessTimes進行判斷符合條件的ip
     * 並放入resMap
     * 
     * @param lines
     * @param resMap
     */ 
    private void judgeAndcollection(List<String> lines,Map<String,List<Date>> resMap) { 
        if(lines != null){ 
            //ip->List<String>accessTimes  
            Map<String,List<String>> judgeMap = new HashMap<String,List<String>>(); 
            for(String line : lines){ 
                line = line.trim(); 
                int space = line.indexOf(delimiter); 
                String ip = line.substring(0, space); 
                 
                List<String> accessTimes = judgeMap.get(ip); 
                if(accessTimes == null){ 
                    accessTimes = new ArrayList<String>();                     
                } 
                accessTimes.add(line.substring(space + 1).trim()); 
                judgeMap.put(ip, accessTimes); 
            } 
             
            if(judgeMap.size() == 0){ 
                return; 
            } 
             
            for(Map.Entry<String, List<String>> entry : judgeMap.entrySet()){ 
                List<String> acessTimes = entry.getValue(); 
                //相同ip,先判斷整體大於10個  
                if(acessTimes != null && acessTimes.size() >= MAGIC){ 
                    //開始判斷在List集合中,5分鐘內訪問超過MAGIC=10  
                    List<Date> attackTimes = DuplicateUtils.attackList(acessTimes, 5 * 60 * 1000, MAGIC); 
                    if(attackTimes != null){ 
                        resMap.put(entry.getKey(), attackTimes); 
                    } 
                } 
            } 
        } 
    } 
     
    /**
     * @param args
     */ 
    public static void main(String[] args) { 
        String filename = "/DuplicateIP/log.txt"; 
        DuplicateIP dip = new DuplicateIP(filename); 
        try { 
            dip.preProcess(); 
            dip.process(); 
        } catch (IOException e) { 
            e.printStackTrace(); 
        } 
    } 
} 

public class DuplicateIP {
 private String delimiter = " ";
 private String FILE_PRE = "ip_";
 
 private int MAGIC = 10,BATCH_MAGIC = 500;
 private String root = "/DuplicateIP/";
 
 private String filename = "";
 
 public DuplicateIP(final String filename) {
  this.filename = filename;
 }
 
 /**
  * 將大文件拆分成較小的文件,進行預處理
  * @throws IOException
  */
 private void preProcess() throws IOException {
  //Path newfile = FileSystems.getDefault().getPath(filename);
  BufferedInputStream fis = new BufferedInputStream(new FileInputStream(new File(filename)));   
  // 用5M的緩沖讀取文本文件
  BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),5*1024*1024);
  
  //假設文件是10G,那麼先根據hashcode拆成小文件,再進行讀寫判斷
  //如果不拆分文件,將ip地址當做key,訪問時間當做value存到hashmap時,
  //當來訪的ip地址足夠多的情況下,內存開銷吃不消
//  List<Entity> entities = new ArrayList<Entity>();
  
  //存放ip的hashcode->accessTimes集合
  Map<String,List<String>> hashcodeMap = new HashMap<String,List<String>>();
  String line = "";
  int count = 0;
  while((line = reader.readLine()) != null){
   String split[] = line.split(delimiter);
   if(split != null && split.length >= 2){
    //根據ip的hashcode這樣拆分文件,拆分後的文件大小在1G上下波動
    //極端情況是整個文件的ip地址全都相同,只有一個,那麼拆分後還是只有一個文件
    int serial = split[0].trim().hashCode() % MAGIC;

    String splitFilename = FILE_PRE + serial;
    List<String> lines = hashcodeMap.get(splitFilename);
    if(lines == null){
     lines = new ArrayList<String>();
     
     hashcodeMap.put(splitFilename, lines);
    }
    lines.add(line);
   }
   
   count ++;
   if(count > 0 && count % BATCH_MAGIC == 0){
       for(Map.Entry<String, List<String>> entry : hashcodeMap.entrySet()){ 
           //System.out.println(entry.getKey()+"--->"+entry.getValue());
        DuplicateUtils.appendFile(root + entry.getKey(), entry.getValue(), Charset.forName("UTF-8")); 
       } 
    //一次操作500之後清空,重新執行
    hashcodeMap.clear();
   }
  }
  
  reader.close();
  fis.close();
 }
 
 private boolean process() throws IOException{
  Path target = Paths.get(root);
  
  //ip -> List<Date>
  Map<String,List<Date>> resMap = new HashMap<String,List<Date>>();
  this.recurseFile(target,resMap);
  
  for(Map.Entry<String, List<Date>> entry : resMap.entrySet()){
   System.out.println(entry.getKey());
   for(Date date : entry.getValue()){
    System.out.println(date);
   }   
  }
  
  return true;
 }
 
 /**
  * 遞歸執行,將5分鐘內訪問超過阈值的ip找出來
  *
  * @param parent
  * @return
  * @throws IOException
  */
 private void recurseFile(Path parent,Map<String,List<Date>> resMap) throws IOException{
  //Path target = Paths.get(dir);
  if(!Files.exists(parent) || !Files.isDirectory(parent)){
   return;
  }
  
  Iterator<Path> targets = parent.iterator();
  for(;targets.hasNext();){
   Path path = targets.next();
   if(Files.isDirectory(parent)){
    //如果還是目錄,遞歸
    recurseFile(path.toAbsolutePath(),resMap);
   }else {
    //將一個文件中所有的行讀上來
    List<String> lines = Files.readAllLines(path, Charset.forName("UTF-8"));
    judgeAndcollection(lines,resMap);
   }   
  }
 }
 
 /**
  * 根據從較小文件讀上來的每行ip accessTimes進行判斷符合條件的ip
  * 並放入resMap
  *
  * @param lines
  * @param resMap
  */
 private void judgeAndcollection(List<String> lines,Map<String,List<Date>> resMap) {
  if(lines != null){
   //ip->List<String>accessTimes
   Map<String,List<String>> judgeMap = new HashMap<String,List<String>>();
   for(String line : lines){
    line = line.trim();
    int space = line.indexOf(delimiter);
    String ip = line.substring(0, space);
    
    List<String> accessTimes = judgeMap.get(ip);
    if(accessTimes == null){
     accessTimes = new ArrayList<String>();     
    }
    accessTimes.add(line.substring(space + 1).trim());
    judgeMap.put(ip, accessTimes);
   }
   
   if(judgeMap.size() == 0){
    return;
   }
   
   for(Map.Entry<String, List<String>> entry : judgeMap.entrySet()){
    List<String> acessTimes = entry.getValue();
    //相同ip,先判斷整體大於10個
    if(acessTimes != null && acessTimes.size() >= MAGIC){
     //開始判斷在List集合中,5分鐘內訪問超過MAGIC=10
     List<Date> attackTimes = DuplicateUtils.attackList(acessTimes, 5 * 60 * 1000, MAGIC);
     if(attackTimes != null){
      resMap.put(entry.getKey(), attackTimes);
     }
    }
   }
  }
 }
 
 /**
  * @param args
  */
 public static void main(String[] args) {
  String filename = "/DuplicateIP/log.txt";
  DuplicateIP dip = new DuplicateIP(filename);
  try {
   dip.preProcess();
   dip.process();
  } catch (IOException e) {
   e.printStackTrace();
  }
 }
}

下面是工具類,提供了一些文件讀寫及查找的功能


 

public class DuplicateUtils { 
    /**
     * 根據給出的數據,往給定的文件形參中追加一行或者幾行數據
     * 
     * @param file
     * @throws IOException 
     */ 
    public static Path appendFile(String splitFilename, Iterable<? extends CharSequence> accessTimes,Charset cs) throws IOException { 
        if(accessTimes != null){ 
            Path target = Paths.get(splitFilename); 
            if(target == null){ 
                createFile(splitFilename); 
            } 
            return Files.write(target, accessTimes, cs);//, options)  
        } 
         
        return null; 
    } 
     
    /**
     * 創建文件
     * @throws IOException 
     */ 
    public static void createFile(String splitFilename) throws IOException { 
        Path target = Paths.get(splitFilename); 
        Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rw-rw-rw-"); 
        FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms); 
        Files.createFile(target, attr); 
    } 
     
    public static Date stringToDate(String dateStr,String dateStyle){ 
        if(dateStr == null || "".equals(dateStr)) 
            return null; 
         
        DateFormat format = new SimpleDateFormat(dateStyle);//"yyyy-MM-dd hh:mm:ss");   
        try { 
            return format.parse(dateStr); 
        } catch (ParseException e) { 
            e.printStackTrace(); 
            return null; 
        }   
    } 
     
    public static String dateToString(Date date,String dateStyle){ 
        if(date == null) 
            return null; 
         
        DateFormat format = new SimpleDateFormat(dateStyle);  
        return format.format(date); 
    } 
     
    /**
     * 根據間隔時間,判斷列表中的數據是否已經大於magic給出的魔法數
     * 返回true or false
     * 
     * @param dates
     * @param intervalDate
     * @param magic
     * @return
     * @throws ParseException 
     */ 
    public static boolean attack(List<String> dateStrs,long intervalDate,int magic) { 
        if(dateStrs == null || dateStrs.size() < magic){ 
            return false; 
        } 
         
        List<Date> dates = new ArrayList<Date>(); 
        for(String date : dateStrs){ 
            if(date != null && !"".equals(date)) 
                dates.add(stringToDate(date,"yyyy-MM-dd hh:mm:ss")); 
        } 
         
        Collections.sort(dates); 
        return judgeAttack(dates,intervalDate,magic); 
    } 
     
    public static boolean judgeAttack(List<Date> sequenceDates,long intervalDate,int magic){ 
        if(sequenceDates == null || sequenceDates.size() < magic){ 
            return false; 
        } 
         
        for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){ 
            Date dateAfter5 = new Date(sequenceDates.get(x).getTime() + intervalDate); 
             
            int count = 1; 
            for(int i = x + 1;i< sequenceDates.size();i++){ 
                Date compareDate = sequenceDates.get(i); 
             
                if(compareDate.before(dateAfter5)) 
                    count ++ ; 
                else  
                    break; 
            } 
             
            if(count >= magic) 
                return true; 
        } 
 
        return false; 
    } 
     
    /**
     * 判斷在間隔時間內,是否有大於magic的上限的數據集合,
     * 如果有,則返回滿足條件的集合
     * 如果找不到滿足條件的,就返回null
     * 
     * @param sequenceDates 已經按照時間順序排序了的數組
     * @param intervalDate
     * @param magic
     * @return
     */ 
    public static List<Date> attackTimes(List<Date> sequenceDates,long intervalDate,int magic){ 
        if(sequenceDates == null || sequenceDates.size() < magic){ 
            return null; 
        } 
         
        List<Date> res = new ArrayList<Date>(); 
        for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){ 
            Date souceDate = sequenceDates.get(x);           
            Date dateAfter5 = new Date(souceDate.getTime() + intervalDate); 
             
            res.add(souceDate); 
             
            for(int i = x + 1;i< sequenceDates.size();i++){ 
                Date compareDate = sequenceDates.get(i); 
             
                if(compareDate.before(dateAfter5)){ 
                    res.add(compareDate); 
                }else  
                    break; 
            } 
             
            if(res.size() >= magic) 
                return res; 
            else  
                res.clear(); 
        } 
 
        return null; 
    } 
 
    public static List<Date> attackList(List<String> dateStrs,long intervalDate,int magic){ 
        if(dateStrs == null || dateStrs.size() < magic){ 
            return null; 
        } 
         
        List<Date> dates = new ArrayList<Date>(); 
        for(String date : dateStrs){ 
            if(date != null && !"".equals(date)) 
                dates.add(stringToDate(date,"yyyy-MM-dd hh:mm:ss")); 
        } 
         
        Collections.sort(dates); 
        return attackTimes(dates,intervalDate,magic); 
    } 
} 

public class DuplicateUtils {
 /**
  * 根據給出的數據,往給定的文件形參中追加一行或者幾行數據
  *
  * @param file
  * @throws IOException
  */
 public static Path appendFile(String splitFilename, Iterable<? extends CharSequence> accessTimes,Charset cs) throws IOException {
  if(accessTimes != null){
   Path target = Paths.get(splitFilename);
   if(target == null){
    createFile(splitFilename);
   }
   return Files.write(target, accessTimes, cs);//, options)
  }
  
  return null;
 }
 
 /**
  * 創建文件
  * @throws IOException
  */
 public static void createFile(String splitFilename) throws IOException {
  Path target = Paths.get(splitFilename);
  Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rw-rw-rw-");
  FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
  Files.createFile(target, attr);
 }
 
 public static Date stringToDate(String dateStr,String dateStyle){
  if(dateStr == null || "".equals(dateStr))
   return null;
  
  DateFormat format = new SimpleDateFormat(dateStyle);//"yyyy-MM-dd hh:mm:ss");
  try {
   return format.parse(dateStr);
  } catch (ParseException e) {
   e.printStackTrace();
   return null;
  } 
 }
 
 public static String dateToString(Date date,String dateStyle){
  if(date == null)
   return null;
  
  DateFormat format = new SimpleDateFormat(dateStyle);
  return format.format(date);
 }
 
 /**
  * 根據間隔時間,判斷列表中的數據是否已經大於magic給出的魔法數
  * 返回true or false
  *
  * @param dates
  * @param intervalDate
  * @param magic
  * @return
  * @throws ParseException
  */
 public static boolean attack(List<String> dateStrs,long intervalDate,int magic) {
  if(dateStrs == null || dateStrs.size() < magic){
   return false;
  }
  
  List<Date> dates = new ArrayList<Date>();
  for(String date : dateStrs){
   if(date != null && !"".equals(date))
    dates.add(stringToDate(date,"yyyy-MM-dd hh:mm:ss"));
  }
  
  Collections.sort(dates);
  return judgeAttack(dates,intervalDate,magic);
 }
 
 public static boolean judgeAttack(List<Date> sequenceDates,long intervalDate,int magic){
  if(sequenceDates == null || sequenceDates.size() < magic){
   return false;
  }
  
  for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){
   Date dateAfter5 = new Date(sequenceDates.get(x).getTime() + intervalDate);
   
   int count = 1;
   for(int i = x + 1;i< sequenceDates.size();i++){
    Date compareDate = sequenceDates.get(i);
   
    if(compareDate.before(dateAfter5))
     count ++ ;
    else
     break;
   }
   
   if(count >= magic)
    return true;
  }

  return false;
 }
 
 /**
  * 判斷在間隔時間內,是否有大於magic的上限的數據集合,
  * 如果有,則返回滿足條件的集合
  * 如果找不到滿足條件的,就返回null
  *
  * @param sequenceDates 已經按照時間順序排序了的數組
  * @param intervalDate
  * @param magic
  * @return
  */
 public static List<Date> attackTimes(List<Date> sequenceDates,long intervalDate,int magic){
  if(sequenceDates == null || sequenceDates.size() < magic){
   return null;
  }
  
  List<Date> res = new ArrayList<Date>();
  for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){
   Date souceDate = sequenceDates.get(x);   
   Date dateAfter5 = new Date(souceDate.getTime() + intervalDate);
   
   res.add(souceDate);
   
   for(int i = x + 1;i< sequenceDates.size();i++){
    Date compareDate = sequenceDates.get(i);
   
    if(compareDate.before(dateAfter5)){
     res.add(compareDate);
    }else
     break;
   }
   
   if(res.size() >= magic)
    return res;
   else
    res.clear();
  }

  return null;
 }

 public static List<Date> attackList(List<String> dateStrs,long intervalDate,int magic){
  if(dateStrs == null || dateStrs.size() < magic){
   return null;
  }
  
  List<Date> dates = new ArrayList<Date>();
  for(String date : dateStrs){
   if(date != null && !"".equals(date))
    dates.add(stringToDate(date,"yyyy-MM-dd hh:mm:ss"));
  }
  
  Collections.sort(dates);
  return attackTimes(dates,intervalDate,magic);
 }
}

 

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved