RecordWriter recordWriter = uploadSession.openRecordWriter(0); ArrayRecord record = (ArrayRecord) uploadSession.newRecord(); // prepare data List arrayData = Arrays.asList(1, 2, 3); Map mapData = new HashMap(); mapData.put("a", 1L); mapData.put("c", 2L); List
从MaxCompute下载复杂类型数据
代码示例:
RecordReader recordReader = downloadSession.openRecordReader(0, 1); // read the record ArrayRecord record1 = (ArrayRecord)recordReader.read(); // get array field data List field0 = record1.getArray(0); List longField0 = record1.getArray(Long.class, 0); // get map field data Map field1 = record1.getMap(1); Map typedField1 = record1.getMap(String.class, Long.class, 1); // get struct field data Struct field2 = record1.getStruct(2);
运行实例
完整代码如下:
import java.io.IOException;import java.util.ArrayList;import java.util.Arrays;import java.util.HashMap;import java.util.List;import java.util.Map;import com.aliyun.odps.Odps;import com.aliyun.odps.PartitionSpec;import com.aliyun.odps.TableSchema;import com.aliyun.odps.account.Account;import com.aliyun.odps.account.AliyunAccount;import com.aliyun.odps.data.ArrayRecord;import com.aliyun.odps.data.RecordReader;import com.aliyun.odps.data.RecordWriter;import com.aliyun.odps.data.SimpleStruct;import com.aliyun.odps.data.Struct;import com.aliyun.odps.tunnel.TableTunnel;import com.aliyun.odps.tunnel.TableTunnel.UploadSession;import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;import com.aliyun.odps.tunnel.TunnelException;import com.aliyun.odps.type.StructTypeInfo;public class TunnelComplexTypeSample { private static String accessId = ""; private static String accessKey = ""; private static String odpsUrl = ""; private static String project = ""; private static String table = ""; // partitions of a partitioned table, eg: "pt=\'1\',ds=\'2\'" // if the table is not a partitioned table, do not need it private static String partition = ""; public static void main(String args[]) { Account account = new AliyunAccount(accessId, accessKey); Odps odps = new Odps(account); odps.setEndpoint(odpsUrl); odps.setDefaultProject(project); try { TableTunnel tunnel = new TableTunnel(odps); PartitionSpec partitionSpec = new PartitionSpec(partition); // ---------- Upload Data --------------- // create upload session for table // the table schema is {"col0": ARRAY, "col1": MAP, "col2": STRUCT} UploadSession uploadSession = tunnel.createUploadSession(project, table, partitionSpec); // get table schema TableSchema schema = uploadSession.getSchema(); // open record writer RecordWriter recordWriter = uploadSession.openRecordWriter(0); ArrayRecord record = (ArrayRecord) uploadSession.newRecord(); // prepare data List arrayData = Arrays.asList(1, 2, 3); Map mapData = new HashMap(); mapData.put("a", 1L); mapData.put("c", 2L); List structData = new ArrayList(); structData.add("Lily"); structData.add(18); // set data to record record.setArray(0, arrayData); record.setMap(1, mapData); record.setStruct(2, new SimpleStruct((StructTypeInfo) schema.getColumn(2).getTypeInfo(), structData)); // write the record recordWriter.write(record); // close writer recordWriter.close(); // commit uploadSession, the upload finish uploadSession.commit(new Long[]{0L}); System.out.println("upload success!"); // ---------- Download Data --------------- // create download session for table // the table schema is {"col0": ARRAY, "col1": MAP, "col2": STRUCT} DownloadSession downloadSession = tunnel.createDownloadSession(project, table, partitionSpec); schema = downloadSession.getSchema(); // open record reader, read one record here for example RecordReader recordReader = downloadSession.openRecordReader(0, 1); // read the record ArrayRecord record1 = (ArrayRecord)recordReader.read(); // get array field data List field0 = record1.getArray(0); List longField0 = record1.getArray(Long.class, 0); // get map field data Map field1 = record1.getMap(1); Map typedField1 = record1.getMap(String.class, Long.class, 1); // get struct field data Struct field2 = record1.getStruct(2); System.out.println("download success!"); } catch (TunnelException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }}