ItemCF 基于物品的协同过滤算法

1、概念

ItemCF:ItemCollaborationFilter,基于物品的协同过滤

算法核心思想:给用户推荐那些和他们之前喜欢的物品相似的物品。

比如,用户A之前买过《数据挖掘导论》,该算法会根据此行为给你推荐《机器学习》,但是ItemCF算法并不利用物品的内容属性计算物品之间的相似度,它主要通过分析用户的行为记录计算物品之间的相似度。

==>该算法认为,物品A和物品B具有很大的相似度是因为喜欢物品A的用户大都也喜欢物品B。

2、原理

http://ov1nop9io.bkt.clouddn.com/1008304-20170302233332454-515456069.jpg

http://ov1nop9io.bkt.clouddn.com/1008304-20170302233332782-1213533378.jpg

http://ov1nop9io.bkt.clouddn.com/1008304-20170302233333235-1610357057.jpg

3、算法实现

步骤

1. 建立物品的同现矩阵A,即统计两两物品同时出现的次数

数据格式:Item_id1:Item_id2 次数

2. 建立用户对物品的评分矩阵B,即每一个用户对某一物品的评分

数据格式:Item_id user_id:preference

3. 推荐结果=物品的同现矩阵A * 用户对物品的评分矩阵B

数据格式:user_id item_id,推荐分值

  1. 过滤用户已评分的物品项

5.对推荐结果按推荐分值从高到低排序

数据

1,101,5.0

1,102,3.0

1,103,2.5

2,101,2.0

2,102,2.5

2,103,5.0

2,104,2.0

3,101,2.0

3,104,4.0

3,105,4.5

3,107,5.0

4,101,5.0

4,103,3.0

4,104,4.5

4,106,4.0

5,101,4.0

5,102,3.0

5,103,2.0

5,104,4.0

5,105,3.5

5,106,4.0

6,102,4.0

6,103,2.0

6,105,3.5

6,107,4.0

Hadoop MapReduce程序分为四步:

第一步:读取原始数据,按用户ID分组,输出文件数据格式为

101:5.0,102:3.0, 103:2.5 —id=1
101:2.0,102:2.5,103:5.0,104:2.0 —id=2
101:2.0,104:4.0,105:4.5, 107:5.0 —id=3
101:5.0,103:3.0,106:4.0,104:4.5 —id=4
101:4.0,102:3.0,103:2.0,104:4.0,105:3.5,106:4.0 —id=5
102:4.0,103:2.0,105:3.5,107:4.0 —id=6

第二步:统计两两物品同时出现的次数(同一个人买2个物品的出现次数),输出文件数据格式为

101:101 5
101:102 3
101:103 4
101:104 4
101:105 2
101:106 2
101:107 1
102:101 3
102:102 4
102:103 4
102:104 2
102:105 2
102:106 1
102:107 1
103:101 4
103:102 4
103:103 5
103:104 3
103:105 2
103:106 2
103:107 1
104:101 4
104:102 2
104:103 3
104:104 4
104:105 2
104:106 2
104:107 1
105:101 2
105:102 2
105:103 2
105:104 2
105:105 3
105:106 1
105:107 2
106:101 2
106:102 1
106:103 2
106:104 2
106:105 1
106:106 2
107:101 1
107:102 1
107:103 1
107:104 1
107:105 2
107:107 2

第三步:生成用户评分矩阵和物品同现矩阵第一个mapper结果为用户评分矩阵,结果如下:

Item_id user_id:preference
101 2:2.0
101 5:4.0
101 4:5.0
101 3:2.0
101 1:5.0
102 2:2.5
102 1:3.0
102 6:4.0
102 5:3.0
103 6:2.0
103 5:2.0
103 1:2.5
103 4:3.0
103 2:5.0
104 5:4.0
104 2:2.0
104 3:4.0
104 4:4.5
105 5:3.5
105 3:4.5
105 6:3.5
106 4:4.0
106 5:4.0
107 3:5.0
107 6:4.0

第二个mapper生成物品同现矩阵,结果如下:

Item_id1:Item_id2 次数
101:101 5
101:102 3
101:103 4
101:104 4
101:105 2
101:106 2
101:107 1
102:101 3
102:102 4
102:103 4
102:104 2
102:105 2
102:106 1
102:107 1
103:101 4
103:102 4
103:103 5
103:104 3
103:105 2
103:106 2
103:107 1
104:101 4
104:102 2
104:103 3
104:104 4
104:105 2
104:106 2
104:107 1
105:101 2
105:102 2
105:103 2
105:104 2
105:105 3
105:106 1
105:107 2
106:101 2
106:102 1
106:103 2
106:104 2
106:105 1
106:106 2
107:101 1
107:102 1
107:103 1
107:104 1
107:105 2
107:107 2

第四步:做矩阵乘法,推荐结果=物品的同现矩阵A * 用户对物品的评分矩阵B

user_id item_id,推荐分值
1 107,10.5
1 106,18.0
1 105,21.0
1 104,33.5
1 103,44.5
1 102,37.0
1 101,44.0
2 107,11.5
2 106,20.5
2 105,23.0
2 104,36.0
2 103,49.0
2 102,40.0
2 101,45.5
3 107,25.0
3 106,16.5
3 105,35.5
3 104,38.0
3 103,34.0
3 102,28.0
3 101,40.0
4 107,12.5
4 106,33.0
4 105,29.0
4 104,55.0
4 103,56.5
4 102,40.0
4 101,63.0
5 107,20.0
5 106,34.5
5 105,40.5
5 104,59.0
5 103,65.0
5 102,51.0
5 101,68.0
6 107,21.0
6 106,11.5
6 105,30.5
6 104,25.0
6 103,37.0
6 102,35.0
6 101,31.0

enter image description here

MR实现步骤

第一个MR 就是把输入数据的每个用户的信息整合下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
userid:1,vector:{103:2.5,102:3.0,101:5.0}  
userid:2,vector:{104:2.0,103:5.0,102:2.5,101:2.0}
userid:3,vector:{107:5.0,105:4.5,104:4.0,101:2.5}
userid:4,vector:{106:4.0,104:4.5,103:3.0,101:5.0}
userid:5,vector:{106:4.0,105:3.5,104:4.0,103:2.0,102:3.0,101:4.0}

public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{

VarLongWritable userID=new VarLongWritable();
LongWritable itemID=new LongWritable();
FloatWritable itemValue=new FloatWritable();
String line=value.toString();
String[] info=line.split(",");
if(info.length!=3){ return; } //uid,itemid,preference
userID.set(Long.parseLong(info[0]));
itemID.set(Long.parseLong(info[1]));
itemValue.set(Float.parseFloat(info[2]));
context.write(userID, new LongAndFloat(itemID,itemValue));
}

public class WiKiReducer1 extends Reducer<VarLongWritable,LongAndFloat,VarLongWritable,VectorWritable> {

public void reduce(VarLongWritable userID,Iterable<LongAndFloat> itemPrefs,Context context) throws IOException, InterruptedException{
// RandomAccessSparseVector(int cardinality, int initialCapacity)
Vector userVector=new RandomAccessSparseVector(Integer.MAX_VALUE,10);
for(LongAndFloat itemPref:itemPrefs){
userVector.set(Integer.parseInt(itemPref.getFirst().toString()),Float.parseFloat(itemPref.getSecond().toString()) );
}
context.write(userID, new VectorWritable(userVector));
// System.out.println("userid:"+userID+",vector:"+userVector);
}

类 LongAndFloat 用于存储数据并实现Writable的数据类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class LongAndFloat implements WritableComparable<LongAndFloat> {  
private LongWritable first;
private FloatWritable second;
public LongAndFloat(){
set(new LongWritable(),new FloatWritable());
}
public LongAndFloat(LongWritable l,FloatWritable f){
set(l,f);
}
public void set(LongWritable longWritable, FloatWritable intWritable) {
// TODO Auto-generated method stub
this.first=longWritable;
this.second=intWritable;
}
public LongWritable getFirst(){
return first;
}
public FloatWritable getSecond(){
return second;
}
@Override
public void readFields(DataInput arg0) throws IOException {
// TODO Auto-generated method stub
first.readFields(arg0);
second.readFields(arg0);
}
@Override
public void write(DataOutput arg0) throws IOException {
// TODO Auto-generated method stub
first.write(arg0);
second.write(arg0);
}
@Override
public int compareTo(LongAndFloat o) {
// TODO Auto-generated method stub
int cmp=first.compareTo(o.first);
if(cmp!=0){
return cmp;
}
return second.compareTo(o.second);
}
}

第二个MR:

输入数据为MR(1) 的输出,只是项目item的相似度,先不管用户ID,直接对后面的所有项目进行拆分。
输出应该类似下面:
Item_id1:Item_id2 次数
101,{107:1.0,106:2.0,105:2.0,104:4.0,103:4.0,102:3.0,101:5.0}
102,{106:1.0,105:1.0,104:2.0,103:3.0,102:3.0,101:3.0}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class WikiMapper2 extends Mapper<VarLongWritable ,VectorWritable,IntWritable,IntWritable>{  

public void map(VarLongWritable userID,VectorWritable userVector,Context context) throws IOException, InterruptedException{
Iterator<Vector.Element> it=userVector.get().iterateNonZero();
while(it.hasNext()){
int index1=it.next().index();
// System.out.println("index1:"+index1);
Iterator<Vector.Element> it2=userVector.get().iterateNonZero();
while(it2.hasNext()){
int index2=it2.next().index();
// test
/*if(index1==101){
System.out.println("index1:"+index1+",index2:"+index2);
}*/
context.write(new IntWritable(index1), new IntWritable(index2));
}
}
}
}

public class WiKiReducer2 extends Reducer<IntWritable,IntWritable,IntWritable,VectorWritable> {
public void reduce(IntWritable itemIndex1,Iterable<IntWritable> itemPrefs,Context context) throws IOException, InterruptedException{
// RandomAccessSparseVector(int cardinality, int initialCapacity)
Vector itemVector=new RandomAccessSparseVector(Integer.MAX_VALUE,10);
for(IntWritable itemPref:itemPrefs){
int itemIndex2=itemPref.get();
itemVector.set(itemIndex2, itemVector.get(itemIndex2)+1.0);
}
context.write(itemIndex1, new VectorWritable(itemVector));
// System.out.println(itemIndex1+","+itemVector);
}
}

第三个MR:

含有两个Mapper,第一个MR(31)把MR(2)的输出的格式转为VectorOrPrefWritable;
MR2为用户评分矩阵
Item_id user_id:preference
101,{107:1.0,106:2.0,105:2.0,104:4.0,103:4.0,102:3.0,101:5.0}
MR(32)针对MR(1)的输出把每一个项目ID和用户ID作为一对进行输出,输出格式也为VectorOrPrefWritable;
MR1生成物品同现矩阵
Item_id1:Item_id2 次数
userid:5,vector:{106:4.0,105:3.5,104:4.0,103:2.0,102:3.0,101:4.0}
VectorOrPrefWritable
input: MR2的输出userVectors
map: 输出:(itemId, VectorOrPrefWritable<userId, pref>)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class WikiMapper31 extends Mapper<IntWritable ,VectorWritable,IntWritable,VectorOrPrefWritable>{  

public void map(IntWritable key,VectorWritable value,Context context) throws IOException, InterruptedException{

context.write(key, new VectorOrPrefWritable(value.get()));
// System.out.println("key"+key.toString()+",vlaue"+value.get());
}
}

public class WiKiReducer31 extends Reducer<IntWritable ,VectorOrPrefWritable,IntWritable,VectorOrPrefWritable> {
public void reduce(IntWritable key,Iterable<VectorOrPrefWritable> values ,Context context ) throws IOException, InterruptedException{
for(VectorOrPrefWritable va:values){
context.write(key, va); }
}
}

第四个MR:

MR4的map不做任何事情;MR4的reduce输出就是把MR(31)和MR(32)的相同的itemID整合一下而已(注意此处的输入为两个路径):如下:
101 {107:1.0,106:2.0,105:2.0,104:4.0,103:4.0,102:3.0,101:5.0} [5 1 4 2 3] [4.0 5.0 5.0 2.0 2.5]
101共现矩阵
101{107:1.0,106:2.0,105:2.0,104:4.0,103:4.0,102:3.0,101:5.0}
101用户评分矩阵
[5 1 4 2 3] [4.0 5.0 5.0 2.0 2.5]
Item_id user_id:preference
101 2:2.0
101 5:4.0
101 4:5.0
101 3:2.0
101 1:5.0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class WikiMapper4 extends Mapper<IntWritable ,VectorOrPrefWritable,IntWritable,VectorOrPrefWritable> {    
public void map(IntWritable key,VectorOrPrefWritable value,Context context) throws IOException, InterruptedException{
context.write(key, value);
}
}

public class WiKiReducer4 extends Reducer<IntWritable,VectorOrPrefWritable,IntWritable,VectorAndPrefsWritable> {
public void reduce(IntWritable key, Iterable<VectorOrPrefWritable> values,Context context) throws IOException, InterruptedException{
List<Long> userfs=new ArrayList<Long>(); //userID
List<Float> prefs=new ArrayList<Float>(); // pref
Vector v=null;
for(VectorOrPrefWritable value:values){
if(value.getVector()!=null){
v=value.getVector();
}else{
userfs.add(value.getUserID());
prefs.add(value.getValue());
}
}
context.write(key, new VectorAndPrefsWritable(v,userfs,prefs));
//System.out.println("key ,itemid:"+key.toString()+", information:"+v+","+userfs+","+prefs);
}
}


第五个MR:

map:针对MR4的输出的每一行中的每一个用户,用这个用户的评分值(value)去乘以项目之间的相似度向量,比如针对第一条记录中的用户3,则有 Vectorforuser3=[1.0 2.0 2.0 4.0 4.0 3.0 5.0]* 2.5 则map的输出为 key : 3 value : Vectorforuser3;
map的输出应该如下所示:
alluserids:[5, 1, 4, 2, 3]
,userid:5,vector:{107:4.0,106:8.0,105:8.0,104:16.0,103:16.0,102:12.0,101:20.0}
,userid:1,vector:{107:5.0,106:10.0,105:10.0,104:20.0,103:20.0,102:15.0,101:25.0}
,userid:4,vector:{107:5.0,106:10.0,105:10.0,104:20.0,103:20.0,102:15.0,101:25.0}
,userid:2,vector:{107:2.0,106:4.0,105:4.0,104:8.0,103:8.0,102:6.0,101:10.0}
,userid:3,vector:{107:2.5,106:5.0,105:5.0,104:10.0,103:10.0,102:7.5,101:12.5}

Combine : 针对map的输出,把相同 key(即userID)的向量对应相加,得到的向量和即为该userID的对各个项目的评分;
combine的输出应该如下所示:
userid:1,vecotr:{107:5.0,106:18.0,105:15.5,104:33.5,103:39.0,102:31.5,101:44.0}
userid:2,vecotr:{107:4.0,106:20.5,105:15.5,104:36.0,103:41.5,102:32.5,101:45.5}
Reduce:针对combine的输出,把用户已经评价过分的项目筛选掉,然后按照评分值的大小有大到小排序输出,即为用户推荐项目;
最后的输出为:
1 [104:33.5,106:18.0,105:15.5,107:5.0]
2 [106:20.5,105:15.5,107:4.0]
3 [103:26.5,102:20.0,106:17.5]
4 [102:37.0,105:26.0,107:9.5]
5 [107:11.5]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public class WikiMapper5 extends Mapper<IntWritable ,VectorAndPrefsWritable,VarLongWritable,VectorWritable>{        
public void map(IntWritable key,VectorAndPrefsWritable vectorAndPref,Context context) throws IOException, InterruptedException{
Vector coo=vectorAndPref.getVector();
List<Long> userIds=vectorAndPref.getUserIDs();
List<Float> prefValues=vectorAndPref.getValues();
//System.out.println("alluserids:"+userIds);
for(int i=0;i<userIds.size();i++){
long userID=userIds.get(i);
float prefValue=prefValues.get(i);
Vector par=coo.times(prefValue);
context.write(new VarLongWritable(userID), new VectorWritable(par));
//System.out.println(",userid:"+userID+",vector:"+par); // if the user id = 3 is the same as my paper then is right
}
}
}

public class WiKiCombiner5 extends Reducer<VarLongWritable,VectorWritable,VarLongWritable,VectorWritable> {
public void reduce(VarLongWritable key, Iterable<VectorWritable> values,Context context) throws IOException, InterruptedException{
Vector partial=null;
for(VectorWritable v:values){
partial=partial==null?v.get():partial.plus(v.get());
}
context.write(key, new VectorWritable(partial));
System.out.println("userid:"+key.toString()+",vecotr:"+partial);// here also should be the same as my paper's result
}
}

public class WiKiReducer5 extends Reducer<VarLongWritable,VectorWritable,VarLongWritable,RecommendedItemsWritable> {
private int recommendationsPerUser=RECOMMENDATIONSPERUSER;
private String path=JOB1OUTPATH;
private static FastMap<Integer,String> map=new FastMap<Integer,String>();
public void setup(Context context) throws IOException{
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(URI.create(path), conf);
Path tempPath=new Path(path);
SequenceFile.Reader reader=null;
try {
reader=new SequenceFile.Reader(fs, tempPath, conf);
Writable key=(Writable)ReflectionUtils.newInstance(reader.getKeyClass(),conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
// long position = reader.getPosition();
while (reader.next(key, value)) {
map.put(Integer.parseInt(key.toString()), value.toString());
// System.out.println(key.toString()+","+value.toString());
// position = reader.getPosition(); // beginning of next record
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

public void reduce(VarLongWritable key, Iterable<VectorWritable> values,Context context) throws IOException, InterruptedException{

int userID=(int)key.get();
Vector rev=null;
for(VectorWritable vec:values){
rev=rev==null? vec.get():rev.plus(vec.get());
}
Queue<RecommendedItem>topItems=new PriorityQueue<RecommendedItem>(
recommendationsPerUser+1,
Collections.reverseOrder(ByValueRecommendedItemComparator.getInstance())
);
Iterator<Vector.Element>recommendationVectorIterator=
rev.iterateNonZero();
while(recommendationVectorIterator.hasNext()){
Vector.Element e=recommendationVectorIterator.next();
int index=e.index();
System.out.println("Vecotr.element.indxe:"+index); // test here find the index is item id or not ** test result : index is item
if(!hasItem(userID,String.valueOf(index))){
float value=(float) e.get();
if(topItems.size()<recommendationsPerUser){
// here only set index
topItems.add(new GenericRecommendedItem(index,value));
}else if(value>topItems.peek().getValue()){
topItems.add(new GenericRecommendedItem(index,value));
topItems.poll();
}
}
}
List<RecommendedItem>recom=new ArrayList<RecommendedItem>(topItems.size());
recom.addAll(topItems);
Collections.sort(recom,ByValueRecommendedItemComparator.getInstance());
context.write(key, new RecommendedItemsWritable(recom));
}

public static boolean hasItem(int user,String item){ // to check whether the user has rate the item
boolean flag=false;
String items=map.get(user);
if(items.contains(item)){
flag=true;
}
return flag;
}
}