`
Chrro
  • 浏览: 9439 次
  • 性别: Icon_minigender_1
  • 来自: 沈阳
社区版块
存档分类
最新评论

mapreduce--读取mysql数据库数据

阅读更多
import java.io.File;
import java.io.IOException;

import java.io.DataInput;

import java.io.DataOutput;



import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;
import java.util.Iterator;



import org.apache.hadoop.examples.EJob;
import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.lib.db.DBWritable;

import org.apache.hadoop.mapred.lib.db.DBInputFormat;

import org.apache.hadoop.mapred.lib.db.DBConfiguration;



public class ReadDB {



    public static class Map extends MapReduceBase implements

            Mapper<LongWritable, StudentRecord, LongWritable, Text> {



        // map

        public void map(LongWritable key, StudentRecord value,

        OutputCollector<LongWritable, Text> collector, Reporter reporter)

                throws IOException {

            collector.collect(new LongWritable(value.id),

                    new Text(value.toString()));

        }



    }
    //reducer
       public static class Reduce extends MapReduceBase implements Reducer<LongWritable,Text,LongWritable,Text>{
@Override
public void reduce(LongWritable key, Iterator<Text> value,
OutputCollector<LongWritable,Text> collector, Reporter reporter)
throws IOException {
// TODO Auto-generated method stub
while (value.hasNext()){
collector.collect(key,value.next());
}
}
      
       }



    public static class StudentRecord implements Writable, DBWritable {

        public int id;

        public String name;

        public String sex;

        public int age;



        @Override

        public void readFields(DataInput in) throws IOException {

            this.id = in.readInt();

            this.name = Text.readString(in);

            this.sex = Text.readString(in);

            this.age = in.readInt();

        }



        @Override

        public void write(DataOutput out) throws IOException {

            out.writeInt(this.id);

            Text.writeString(out, this.name);

            Text.writeString(out, this.sex);

            out.writeInt(this.age);

        }



        @Override

        public void readFields(ResultSet result) throws SQLException {

            this.id = result.getInt(1);

            this.name = result.getString(2);

            this.sex = result.getString(3);

            this.age = result.getInt(4);

        }



        @Override

        public void write(PreparedStatement stmt) throws SQLException {

            stmt.setInt(1, this.id);

            stmt.setString(2, this.name);

            stmt.setString(3, this.sex);

            stmt.setInt(4, this.age);

        }



        @Override

        public String toString() {

            return new String("学号" + this.id + "_姓名:" + this.name

                    + "_性别:"+ this.sex + "_年龄:" + this.age);

        }

    }



    @SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {



        JobConf conf = new JobConf(ReadDB.class);
        //设置地址
       conf.set("fs.default.name", "hdfs://192.168.71.128:9000");
        conf.set("mapred.job.tracker", "192.168.71.128:9001");
        conf.set("dfs.permissions","false");
            

        File jarFile = EJob.createTempJar("bin");
   
    EJob.addClasspath("/usr/hadoop/conf");
    
    ClassLoader classLoader = EJob.getClassLoader();
    
    Thread.currentThread().setContextClassLoader(classLoader);

conf.setJar(jarFile.toString());


DistributedCache.addFileToClassPath(new Path(

         "/usr/hadoop/lib/mysql-connector-java-5.1.18-bin.jar"), conf);
Class.forName("com.mysql.jdbc.Driver");

       
        // 设置map和reduce类
       
        conf.setMapperClass(Map.class);

        conf.setReducerClass(Reduce.class);
       
        // 设置数据库
       
        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",

            "jdbc:mysql://192.168.71.128:3306/song", "root", "mysql");


       
        // 设置表字段

        String[] fields = { "id", "name", "sex", "age" };

        DBInputFormat.setInput(conf, StudentRecord.class, "student", null,"id", fields);

        // 设置输入类型

        conf.setInputFormat(DBInputFormat.class);

        //conf.setMapOutputKeyClass(Text.class);
        //conf.setMapOutputValueClass(LongWritable.class);
        // 设置输出类型

        conf.setOutputKeyClass(LongWritable.class);

        conf.setOutputValueClass(Text.class);


        // 输出路径

        FileOutputFormat.setOutputPath(conf, new Path("rdb_out"));

        JobClient.runJob(conf);

    }

}
//需要把mysql驱动包添加到工程中
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics