diff --git a/src/reader/arrow_transform.ts b/src/reader/arrow_transform.ts index ca9e8b44..01a3bda3 100644 --- a/src/reader/arrow_transform.ts +++ b/src/reader/arrow_transform.ts @@ -17,7 +17,7 @@ import { RecordBatchReader, RecordBatch, RecordBatchStreamReader, - Vector, + DataType, } from 'apache-arrow'; import * as protos from '../../protos/protos'; @@ -140,12 +140,13 @@ export class ArrowRecordBatchTableRowTransform extends Transform { } for (let j = 0; j < batch.numCols; j++) { const column = batch.selectAt([j]); - const columnName = column.schema.fields[0].name; + const field = column.schema.fields[0]; + const columnName = field.name; for (let i = 0; i < batch.numRows; i++) { const fieldData = column.get(i); const fieldValue = fieldData?.toJSON()[columnName]; rows[i].f[j] = { - v: convertArrowValue(fieldValue), + v: convertArrowValue(fieldValue, field.type as DataType), }; } } @@ -156,21 +157,36 @@ export class ArrowRecordBatchTableRowTransform extends Transform { } } -function convertArrowValue(fieldValue: any): any { - if (typeof fieldValue === 'object') { - if (fieldValue instanceof Vector) { - const arr = fieldValue.toJSON(); - return arr.map((v: any) => { - return {v: convertArrowValue(v)}; - }); - } - const tableRow: TableRow = {f: []}; +function convertArrowValue(fieldValue: any, type: DataType): any { + if (fieldValue === null) { + return null; + } + if (DataType.isList(type)) { + const arr = fieldValue.toJSON(); + return arr.map((v: any) => { + // Arrays/lists in BigQuery have the same datatype for every element + // so getting the first one is all we need + const elemType = type.children[0].type; + return {v: convertArrowValue(v, elemType)}; + }); + } + if (DataType.isStruct(type)) { + const tableRow: TableRow = {}; Object.keys(fieldValue).forEach(key => { - tableRow.f?.push({ - v: convertArrowValue(fieldValue[key]), + const elemType = type.children.find(f => f.name === key); + if (!tableRow.f) { + tableRow.f = []; + } + tableRow.f.push({ + v: convertArrowValue(fieldValue[key], elemType?.type as DataType), }); }); return tableRow; } + if (DataType.isTimestamp(type)) { + // timestamp comes in microsecond, convert to nanoseconds + // to make it compatible with BigQuery.timestamp. + return fieldValue * 1000; + } return fieldValue; } diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts index 82fd91a7..36effd48 100644 --- a/system-test/reader_client_test.ts +++ b/system-test/reader_client_test.ts @@ -72,6 +72,33 @@ describe('reader.ReaderClient', () => { type: 'INTEGER', mode: 'REQUIRED', }, + { + name: 'optional', + type: 'STRING', + mode: 'NULLABLE', + }, + { + name: 'list', + type: 'INT64', + mode: 'REPEATED', + }, + { + name: 'metadata', + type: 'RECORD', + mode: 'NULLABLE', + fields: [ + { + name: 'created_at', + type: 'TIMESTAMP', + mode: 'REQUIRED', + }, + { + name: 'updated_at', + type: 'TIMESTAMP', + mode: 'NULLABLE', + }, + ], + }, ], }; @@ -97,9 +124,26 @@ describe('reader.ReaderClient', () => { .dataset(datasetId) .table(tableId) .insert([ - {name: 'Ada Lovelace', row_num: 1}, - {name: 'Alan Turing', row_num: 2}, - {name: 'Bell', row_num: 3}, + { + name: 'Ada Lovelace', + row_num: 1, + optional: 'Some data', + list: [1], + metadata: { + created_at: bigquery.timestamp('2020-04-27T18:07:25.356Z'), + updated_at: bigquery.timestamp('2020-04-27T20:07:25.356Z'), + }, + }, + { + name: 'Alan Turing', + row_num: 2, + optional: 'Some other data', + list: [1, 2], + metadata: { + created_at: bigquery.timestamp('2020-04-27T18:07:25.356Z'), + }, + }, + {name: 'Bell', row_num: 3, list: [1, 2, 3]}, ]); }); @@ -218,7 +262,7 @@ describe('reader.ReaderClient', () => { const table = await tableFromIPC(content); assert.equal(table.numRows, 3); - assert.equal(table.numCols, 2); + assert.equal(table.numCols, 5); reader.close(); } finally { @@ -253,7 +297,7 @@ describe('reader.ReaderClient', () => { const table = new Table(batches); assert.equal(table.numRows, 3); - assert.equal(table.numCols, 2); + assert.equal(table.numCols, 5); reader.close(); } finally { @@ -295,6 +339,143 @@ describe('reader.ReaderClient', () => { assert.equal(rows.length, 3); + assert.deepEqual(rows, [ + { + f: [ + { + v: 'Ada Lovelace', + }, + { + v: '1', + }, + { + v: 'Some data', + }, + { + v: [ + { + v: '1', + }, + ], + }, + { + v: { + f: [ + { + v: 1588010845356000, + }, + { + v: 1588018045356000, + }, + ], + }, + }, + ], + }, + { + f: [ + { + v: 'Alan Turing', + }, + { + v: '2', + }, + { + v: 'Some other data', + }, + { + v: [ + { + v: '1', + }, + { + v: '2', + }, + ], + }, + { + v: { + f: [ + { + v: 1588010845356000, + }, + { + v: null, + }, + ], + }, + }, + ], + }, + { + f: [ + { + v: 'Bell', + }, + { + v: '3', + }, + { + v: null, + }, + { + v: [ + { + v: '1', + }, + { + v: '2', + }, + { + v: '3', + }, + ], + }, + { + v: null, + }, + ], + }, + ]); + const mergedRows = BigQuery.mergeSchemaWithRows_(schema, rows, { + wrapIntegers: false, + }); + assert.deepEqual(mergedRows, [ + { + name: 'Ada Lovelace', + row_num: 1, + optional: 'Some data', + list: [1], + metadata: { + created_at: { + value: '2020-04-27T18:07:25.356Z', + }, + updated_at: { + value: '2020-04-27T20:07:25.356Z', + }, + }, + }, + { + name: 'Alan Turing', + row_num: 2, + optional: 'Some other data', + list: [1, 2], + metadata: { + created_at: { + value: '2020-04-27T18:07:25.356Z', + }, + updated_at: null, + }, + }, + { + name: 'Bell', + row_num: 3, + list: [1, 2, 3], + optional: null, + metadata: null, + }, + ]); + reader.close(); } finally { client.close();