一個很大的文件,例如10G,僅包含ip地址和訪問時間二列,格式如下:
127.0.0.1 2013-07-22 14:00 127.0.0.1 2013-07-22 14:02 127.0.0.1 2013-07-22 14:03 127.0.0.3 2013-07-22 14:03 127.0.0.1 2013-07-22 14:04 127.0.0.1 2013-07-22 14:05 127.0.0.1 2013-07-22 14:06 127.0.0.1 2013-07-22 14:07 127.0.0.1 2013-07-22 14:08 127.0.0.1 2013-07-22 14:09 127.0.0.1 2013-07-22 14:10 127.0.0.1 2013-07-22 14:11 127.0.0.1 2013-07-22 14:12 127.0.0.1 2013-07-22 14:13 127.0.0.4 2013-07-22 14:13 127.0.0.1 2013-07-22 14:15 127.0.0.1 2013-07-22 14:16 127.0.0.4 2013-07-22 14:17 ... ... 127.0.0.1 2013-07-22 14:00 127.0.0.1 2013-07-22 14:02 127.0.0.1 2013-07-22 14:03 127.0.0.3 2013-07-22 14:03 127.0.0.1 2013-07-22 14:04 127.0.0.1 2013-07-22 14:05 127.0.0.1 2013-07-22 14:06 127.0.0.1 2013-07-22 14:07 127.0.0.1 2013-07-22 14:08 127.0.0.1 2013-07-22 14:09 127.0.0.1 2013-07-22 14:10 127.0.0.1 2013-07-22 14:11 127.0.0.1 2013-07-22 14:12 127.0.0.1 2013-07-22 14:13 127.0.0.4 2013-07-22 14:13 127.0.0.1 2013-07-22 14:15 127.0.0.1 2013-07-22 14:16 127.0.0.4 2013-07-22 14:17 ... ...
從文件裡查出在5分鐘內連續登陸10次以上的ip地址集合並輸出。這類問題是一個很常見的應用,通常都是從大的log日志文件中找出有攻擊嫌疑的ip。
這類應用因為要處理分析的文件非常大,顯然不能將整個文件全部讀入內存,然後進行下一步工作。常見的比較成熟的解決方案有:分治+Hash,Bloom filter,2-Bitmap等。 這裡就使用第一種方式來解決。
下面是分治與hash的代碼
public class DuplicateIP {
private String delimiter = " ";
private String FILE_PRE = "ip_";
private int MAGIC = 10,BATCH_MAGIC = 500;
private String root = "/DuplicateIP/";
private String filename = "";
public DuplicateIP(final String filename) {
this.filename = filename;
}
/**
* 將大文件拆分成較小的文件,進行預處理
* @throws IOException
*/
private void preProcess() throws IOException {
//Path newfile = FileSystems.getDefault().getPath(filename);
BufferedInputStream fis = new BufferedInputStream(new FileInputStream(new File(filename)));
// 用5M的緩沖讀取文本文件
BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),5*1024*1024);
//假設文件是10G,那麼先根據hashcode拆成小文件,再進行讀寫判斷
//如果不拆分文件,將ip地址當做key,訪問時間當做value存到hashmap時,
//當來訪的ip地址足夠多的情況下,內存開銷吃不消
// List<Entity> entities = new ArrayList<Entity>();
//存放ip的hashcode->accessTimes集合
Map<String,List<String>> hashcodeMap = new HashMap<String,List<String>>();
String line = "";
int count = 0;
while((line = reader.readLine()) != null){
String split[] = line.split(delimiter);
if(split != null && split.length >= 2){
//根據ip的hashcode這樣拆分文件,拆分後的文件大小在1G上下波動
//極端情況是整個文件的ip地址全都相同,只有一個,那麼拆分後還是只有一個文件
int serial = split[0].trim().hashCode() % MAGIC;
String splitFilename = FILE_PRE + serial;
List<String> lines = hashcodeMap.get(splitFilename);
if(lines == null){
lines = new ArrayList<String>();
hashcodeMap.put(splitFilename, lines);
}
lines.add(line);
}
count ++;
if(count > 0 && count % BATCH_MAGIC == 0){
for(Map.Entry<String, List<String>> entry : hashcodeMap.entrySet()){
//System.out.println(entry.getKey()+"--->"+entry.getValue());
DuplicateUtils.appendFile(root + entry.getKey(), entry.getValue(), Charset.forName("UTF-8"));
}
//一次操作500之後清空,重新執行
hashcodeMap.clear();
}
}
reader.close();
fis.close();
}
private boolean process() throws IOException{
Path target = Paths.get(root);
//ip -> List<Date>
Map<String,List<Date>> resMap = new HashMap<String,List<Date>>();
this.recurseFile(target,resMap);
for(Map.Entry<String, List<Date>> entry : resMap.entrySet()){
System.out.println(entry.getKey());
for(Date date : entry.getValue()){
System.out.println(date);
}
}
return true;
}
/**
* 遞歸執行,將5分鐘內訪問超過阈值的ip找出來
*
* @param parent
* @return
* @throws IOException
*/
private void recurseFile(Path parent,Map<String,List<Date>> resMap) throws IOException{
//Path target = Paths.get(dir);
if(!Files.exists(parent) || !Files.isDirectory(parent)){
return;
}
Iterator<Path> targets = parent.iterator();
for(;targets.hasNext();){
Path path = targets.next();
if(Files.isDirectory(parent)){
//如果還是目錄,遞歸
recurseFile(path.toAbsolutePath(),resMap);
}else {
//將一個文件中所有的行讀上來
List<String> lines = Files.readAllLines(path, Charset.forName("UTF-8"));
judgeAndcollection(lines,resMap);
}
}
}
/**
* 根據從較小文件讀上來的每行ip accessTimes進行判斷符合條件的ip
* 並放入resMap
*
* @param lines
* @param resMap
*/
private void judgeAndcollection(List<String> lines,Map<String,List<Date>> resMap) {
if(lines != null){
//ip->List<String>accessTimes
Map<String,List<String>> judgeMap = new HashMap<String,List<String>>();
for(String line : lines){
line = line.trim();
int space = line.indexOf(delimiter);
String ip = line.substring(0, space);
List<String> accessTimes = judgeMap.get(ip);
if(accessTimes == null){
accessTimes = new ArrayList<String>();
}
accessTimes.add(line.substring(space + 1).trim());
judgeMap.put(ip, accessTimes);
}
if(judgeMap.size() == 0){
return;
}
for(Map.Entry<String, List<String>> entry : judgeMap.entrySet()){
List<String> acessTimes = entry.getValue();
//相同ip,先判斷整體大於10個
if(acessTimes != null && acessTimes.size() >= MAGIC){
//開始判斷在List集合中,5分鐘內訪問超過MAGIC=10
List<Date> attackTimes = DuplicateUtils.attackList(acessTimes, 5 * 60 * 1000, MAGIC);
if(attackTimes != null){
resMap.put(entry.getKey(), attackTimes);
}
}
}
}
}
/**
* @param args
*/
public static void main(String[] args) {
String filename = "/DuplicateIP/log.txt";
DuplicateIP dip = new DuplicateIP(filename);
try {
dip.preProcess();
dip.process();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class DuplicateIP {
private String delimiter = " ";
private String FILE_PRE = "ip_";
private int MAGIC = 10,BATCH_MAGIC = 500;
private String root = "/DuplicateIP/";
private String filename = "";
public DuplicateIP(final String filename) {
this.filename = filename;
}
/**
* 將大文件拆分成較小的文件,進行預處理
* @throws IOException
*/
private void preProcess() throws IOException {
//Path newfile = FileSystems.getDefault().getPath(filename);
BufferedInputStream fis = new BufferedInputStream(new FileInputStream(new File(filename)));
// 用5M的緩沖讀取文本文件
BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),5*1024*1024);
//假設文件是10G,那麼先根據hashcode拆成小文件,再進行讀寫判斷
//如果不拆分文件,將ip地址當做key,訪問時間當做value存到hashmap時,
//當來訪的ip地址足夠多的情況下,內存開銷吃不消
// List<Entity> entities = new ArrayList<Entity>();
//存放ip的hashcode->accessTimes集合
Map<String,List<String>> hashcodeMap = new HashMap<String,List<String>>();
String line = "";
int count = 0;
while((line = reader.readLine()) != null){
String split[] = line.split(delimiter);
if(split != null && split.length >= 2){
//根據ip的hashcode這樣拆分文件,拆分後的文件大小在1G上下波動
//極端情況是整個文件的ip地址全都相同,只有一個,那麼拆分後還是只有一個文件
int serial = split[0].trim().hashCode() % MAGIC;
String splitFilename = FILE_PRE + serial;
List<String> lines = hashcodeMap.get(splitFilename);
if(lines == null){
lines = new ArrayList<String>();
hashcodeMap.put(splitFilename, lines);
}
lines.add(line);
}
count ++;
if(count > 0 && count % BATCH_MAGIC == 0){
for(Map.Entry<String, List<String>> entry : hashcodeMap.entrySet()){
//System.out.println(entry.getKey()+"--->"+entry.getValue());
DuplicateUtils.appendFile(root + entry.getKey(), entry.getValue(), Charset.forName("UTF-8"));
}
//一次操作500之後清空,重新執行
hashcodeMap.clear();
}
}
reader.close();
fis.close();
}
private boolean process() throws IOException{
Path target = Paths.get(root);
//ip -> List<Date>
Map<String,List<Date>> resMap = new HashMap<String,List<Date>>();
this.recurseFile(target,resMap);
for(Map.Entry<String, List<Date>> entry : resMap.entrySet()){
System.out.println(entry.getKey());
for(Date date : entry.getValue()){
System.out.println(date);
}
}
return true;
}
/**
* 遞歸執行,將5分鐘內訪問超過阈值的ip找出來
*
* @param parent
* @return
* @throws IOException
*/
private void recurseFile(Path parent,Map<String,List<Date>> resMap) throws IOException{
//Path target = Paths.get(dir);
if(!Files.exists(parent) || !Files.isDirectory(parent)){
return;
}
Iterator<Path> targets = parent.iterator();
for(;targets.hasNext();){
Path path = targets.next();
if(Files.isDirectory(parent)){
//如果還是目錄,遞歸
recurseFile(path.toAbsolutePath(),resMap);
}else {
//將一個文件中所有的行讀上來
List<String> lines = Files.readAllLines(path, Charset.forName("UTF-8"));
judgeAndcollection(lines,resMap);
}
}
}
/**
* 根據從較小文件讀上來的每行ip accessTimes進行判斷符合條件的ip
* 並放入resMap
*
* @param lines
* @param resMap
*/
private void judgeAndcollection(List<String> lines,Map<String,List<Date>> resMap) {
if(lines != null){
//ip->List<String>accessTimes
Map<String,List<String>> judgeMap = new HashMap<String,List<String>>();
for(String line : lines){
line = line.trim();
int space = line.indexOf(delimiter);
String ip = line.substring(0, space);
List<String> accessTimes = judgeMap.get(ip);
if(accessTimes == null){
accessTimes = new ArrayList<String>();
}
accessTimes.add(line.substring(space + 1).trim());
judgeMap.put(ip, accessTimes);
}
if(judgeMap.size() == 0){
return;
}
for(Map.Entry<String, List<String>> entry : judgeMap.entrySet()){
List<String> acessTimes = entry.getValue();
//相同ip,先判斷整體大於10個
if(acessTimes != null && acessTimes.size() >= MAGIC){
//開始判斷在List集合中,5分鐘內訪問超過MAGIC=10
List<Date> attackTimes = DuplicateUtils.attackList(acessTimes, 5 * 60 * 1000, MAGIC);
if(attackTimes != null){
resMap.put(entry.getKey(), attackTimes);
}
}
}
}
}
/**
* @param args
*/
public static void main(String[] args) {
String filename = "/DuplicateIP/log.txt";
DuplicateIP dip = new DuplicateIP(filename);
try {
dip.preProcess();
dip.process();
} catch (IOException e) {
e.printStackTrace();
}
}
}
下面是工具類,提供了一些文件讀寫及查找的功能
public class DuplicateUtils {
/**
* 根據給出的數據,往給定的文件形參中追加一行或者幾行數據
*
* @param file
* @throws IOException
*/
public static Path appendFile(String splitFilename, Iterable<? extends CharSequence> accessTimes,Charset cs) throws IOException {
if(accessTimes != null){
Path target = Paths.get(splitFilename);
if(target == null){
createFile(splitFilename);
}
return Files.write(target, accessTimes, cs);//, options)
}
return null;
}
/**
* 創建文件
* @throws IOException
*/
public static void createFile(String splitFilename) throws IOException {
Path target = Paths.get(splitFilename);
Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rw-rw-rw-");
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
Files.createFile(target, attr);
}
public static Date stringToDate(String dateStr,String dateStyle){
if(dateStr == null || "".equals(dateStr))
return null;
DateFormat format = new SimpleDateFormat(dateStyle);//"yyyy-MM-dd hh:mm:ss");
try {
return format.parse(dateStr);
} catch (ParseException e) {
e.printStackTrace();
return null;
}
}
public static String dateToString(Date date,String dateStyle){
if(date == null)
return null;
DateFormat format = new SimpleDateFormat(dateStyle);
return format.format(date);
}
/**
* 根據間隔時間,判斷列表中的數據是否已經大於magic給出的魔法數
* 返回true or false
*
* @param dates
* @param intervalDate
* @param magic
* @return
* @throws ParseException
*/
public static boolean attack(List<String> dateStrs,long intervalDate,int magic) {
if(dateStrs == null || dateStrs.size() < magic){
return false;
}
List<Date> dates = new ArrayList<Date>();
for(String date : dateStrs){
if(date != null && !"".equals(date))
dates.add(stringToDate(date,"yyyy-MM-dd hh:mm:ss"));
}
Collections.sort(dates);
return judgeAttack(dates,intervalDate,magic);
}
public static boolean judgeAttack(List<Date> sequenceDates,long intervalDate,int magic){
if(sequenceDates == null || sequenceDates.size() < magic){
return false;
}
for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){
Date dateAfter5 = new Date(sequenceDates.get(x).getTime() + intervalDate);
int count = 1;
for(int i = x + 1;i< sequenceDates.size();i++){
Date compareDate = sequenceDates.get(i);
if(compareDate.before(dateAfter5))
count ++ ;
else
break;
}
if(count >= magic)
return true;
}
return false;
}
/**
* 判斷在間隔時間內,是否有大於magic的上限的數據集合,
* 如果有,則返回滿足條件的集合
* 如果找不到滿足條件的,就返回null
*
* @param sequenceDates 已經按照時間順序排序了的數組
* @param intervalDate
* @param magic
* @return
*/
public static List<Date> attackTimes(List<Date> sequenceDates,long intervalDate,int magic){
if(sequenceDates == null || sequenceDates.size() < magic){
return null;
}
List<Date> res = new ArrayList<Date>();
for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){
Date souceDate = sequenceDates.get(x);
Date dateAfter5 = new Date(souceDate.getTime() + intervalDate);
res.add(souceDate);
for(int i = x + 1;i< sequenceDates.size();i++){
Date compareDate = sequenceDates.get(i);
if(compareDate.before(dateAfter5)){
res.add(compareDate);
}else
break;
}
if(res.size() >= magic)
return res;
else
res.clear();
}
return null;
}
public static List<Date> attackList(List<String> dateStrs,long intervalDate,int magic){
if(dateStrs == null || dateStrs.size() < magic){
return null;
}
List<Date> dates = new ArrayList<Date>();
for(String date : dateStrs){
if(date != null && !"".equals(date))
dates.add(stringToDate(date,"yyyy-MM-dd hh:mm:ss"));
}
Collections.sort(dates);
return attackTimes(dates,intervalDate,magic);
}
}
public class DuplicateUtils {
/**
* 根據給出的數據,往給定的文件形參中追加一行或者幾行數據
*
* @param file
* @throws IOException
*/
public static Path appendFile(String splitFilename, Iterable<? extends CharSequence> accessTimes,Charset cs) throws IOException {
if(accessTimes != null){
Path target = Paths.get(splitFilename);
if(target == null){
createFile(splitFilename);
}
return Files.write(target, accessTimes, cs);//, options)
}
return null;
}
/**
* 創建文件
* @throws IOException
*/
public static void createFile(String splitFilename) throws IOException {
Path target = Paths.get(splitFilename);
Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rw-rw-rw-");
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
Files.createFile(target, attr);
}
public static Date stringToDate(String dateStr,String dateStyle){
if(dateStr == null || "".equals(dateStr))
return null;
DateFormat format = new SimpleDateFormat(dateStyle);//"yyyy-MM-dd hh:mm:ss");
try {
return format.parse(dateStr);
} catch (ParseException e) {
e.printStackTrace();
return null;
}
}
public static String dateToString(Date date,String dateStyle){
if(date == null)
return null;
DateFormat format = new SimpleDateFormat(dateStyle);
return format.format(date);
}
/**
* 根據間隔時間,判斷列表中的數據是否已經大於magic給出的魔法數
* 返回true or false
*
* @param dates
* @param intervalDate
* @param magic
* @return
* @throws ParseException
*/
public static boolean attack(List<String> dateStrs,long intervalDate,int magic) {
if(dateStrs == null || dateStrs.size() < magic){
return false;
}
List<Date> dates = new ArrayList<Date>();
for(String date : dateStrs){
if(date != null && !"".equals(date))
dates.add(stringToDate(date,"yyyy-MM-dd hh:mm:ss"));
}
Collections.sort(dates);
return judgeAttack(dates,intervalDate,magic);
}
public static boolean judgeAttack(List<Date> sequenceDates,long intervalDate,int magic){
if(sequenceDates == null || sequenceDates.size() < magic){
return false;
}
for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){
Date dateAfter5 = new Date(sequenceDates.get(x).getTime() + intervalDate);
int count = 1;
for(int i = x + 1;i< sequenceDates.size();i++){
Date compareDate = sequenceDates.get(i);
if(compareDate.before(dateAfter5))
count ++ ;
else
break;
}
if(count >= magic)
return true;
}
return false;
}
/**
* 判斷在間隔時間內,是否有大於magic的上限的數據集合,
* 如果有,則返回滿足條件的集合
* 如果找不到滿足條件的,就返回null
*
* @param sequenceDates 已經按照時間順序排序了的數組
* @param intervalDate
* @param magic
* @return
*/
public static List<Date> attackTimes(List<Date> sequenceDates,long intervalDate,int magic){
if(sequenceDates == null || sequenceDates.size() < magic){
return null;
}
List<Date> res = new ArrayList<Date>();
for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){
Date souceDate = sequenceDates.get(x);
Date dateAfter5 = new Date(souceDate.getTime() + intervalDate);
res.add(souceDate);
for(int i = x + 1;i< sequenceDates.size();i++){
Date compareDate = sequenceDates.get(i);
if(compareDate.before(dateAfter5)){
res.add(compareDate);
}else
break;
}
if(res.size() >= magic)
return res;
else
res.clear();
}
return null;
}
public static List<Date> attackList(List<String> dateStrs,long intervalDate,int magic){
if(dateStrs == null || dateStrs.size() < magic){
return null;
}
List<Date> dates = new ArrayList<Date>();
for(String date : dateStrs){
if(date != null && !"".equals(date))
dates.add(stringToDate(date,"yyyy-MM-dd hh:mm:ss"));
}
Collections.sort(dates);
return attackTimes(dates,intervalDate,magic);
}
}