Skip to main content

Record processing

Connectors built with the connector builder always make HTTP requests, receive the responses and emit records. Besides making the right requests, it's important to properly hand over the records to the system:

  • Extract the records (record selection)
  • Do optional post-processing (transformations)
  • Provide record meta data to the system to inform downstream processes (primary key and declared schema)

Record selection

When doing HTTP requests, the connector expects the records to be part of the response JSON body. The "Record selector" field of the stream needs to be set to the property of the response object that holds the records.

Very often, the response body contains an array of records along with some suplementary information (for example meta data for pagination).

For example the "Most popular" NY Times API returns the following response body:

{
    "status": "OK",
    "copyright": "Copyright (c) 2023 The New York Times Company.  All Rights Reserved.",
    "num_results": 20,
    "results": [
      {
        "uri": "nyt://article/c15e5227-ed68-54d9-9e5b-acf5a451ec37",
        "url": "https://www.nytimes.com/2023/04/16/us/science-of-reading-literacy-parents.html",
        "id": 100000008811231,
        "asset_id": 100000008811231,
        "source": "New York Times",
        // ...
      },
      // ..
    ],
    // ...
}

Setting the record selector to results selects the array with the actual records, everything else is discarded.

Nested objects

In some cases the array of actual records is nested multiple levels deep in the response, like for the "Archive" NY Times API:

{
    "copyright": "Copyright (c) 2020 The New York Times Company. All Rights Reserved.",
    "response": {
      "docs": [
        {
          "abstract": "From the Treaty of Versailles to Prohibition, the events of that year shaped America, and the world, for a century to come. ",
          "web_url": "https://www.nytimes.com/2018/12/31/opinion/1919-america.html",
          "snippet": "From the Treaty of Versailles to Prohibition, the events of that year shaped America, and the world, for a century to come. ",
          // ...
        },
        // ...
      ]
    }
}

Setting the record selector needs to be set to "response,docs" selects the nested array.

Root array

In some cases, the response body itself is an array of records, like in the CoinAPI API:

[
  {
    "symbol_id": "BITSTAMP_SPOT_BTC_USD",
    "time_exchange": "2013-09-28T22:40:50.0000000Z",
    "time_coinapi": "2017-03-18T22:42:21.3763342Z",
    // ...
  },
  {
    "symbol_id": "BITSTAMP_SPOT_BTC_USD",
    "time_exchange": "2013-09-28T22:40:50.0000000Z",
    "time_coinapi": "2017-03-18T22:42:21.3763342Z",
   // ..
  }
  // ...
]

In this case, the record selector can be omitted and the whole response becomes the list of records.

Single object

Sometimes, there is only one record returned per request from the API. In this case, the record selector can also point to an object instead of an array which will be handled as the only record, like in the case of the Exchange Rates API:

{
    "success": true,
    "historical": true,
    "date": "2013-12-24",
    "timestamp": 1387929599,
    "base": "GBP",
    "rates": {
        "USD": 1.636492,
        "EUR": 1.196476,
        "CAD": 1.739516
    }
}

In this case, a record selector of rates will yield a single record which contains all the exchange rates in a single object.

Fields nested in arrays

In some cases, records are selected in multiple branches of the response object (for example within each item of an array):


{
"data": [
{
"record": {
"id": "1"
}
},
{
"record": {
"id": "2"
}
}
]
}

In this case a record selector with a placeholder * selects all children at the current position in the path, in this case data, *, record will return the following records:

[
{
"id": 1
},
{
"id": 2
}
]

Transformations

It is recommended to not change records during the extraction process the connector is performing, but instead load them into the downstream warehouse unchanged and perform necessary transformations there in order to stay flexible in what data is required. However there are some reasons that require the modifying the fields of records before they are sent to the warehouse:

  • Remove personally identifiable information (PII) to ensure compliance with local legislation
  • Pseudonymise sensitive fields
  • Remove large fields that don't contain interesting information and significantly increase load on the system

The "transformations" feature can be used for these purposes.

Removing fields

To remove a field from a record, add a new transformation in the "Transformations" section of type "remove" and enter the field path. For example in case of the EmailOctopus API, the campaigns records also include the html content of the mailing which takes up a lot of space:

{
"data": [
{
"id": "00000000-0000-0000-0000-000000000000",
"status": "SENT",
"name": "Foo",
"subject": "Bar",
"from": {
"name": "John Doe",
"email_address": "john.doe@gmail.com"
},
"content": {
"html": "<html>lots of text here...<html>",
"plain_text": "Lots of plain text here...."
},
"created_at": "2023-04-13T15:28:37+00:00",
"sent_at": "2023-04-14T15:28:37+00:00"
},
]
}

Setting the "Path" of the remove-transformation to content removes these fields from the records:

{
"id": "00000000-0000-0000-0000-000000000000",
"status": "SENT",
"from": {
"name": "John Doe",
"email_address": "john.doe@gmail.com"
},
"name": "Foo",
"subject": "Bar",
"created_at": "2023-04-13T15:28:37+00:00",
"sent_at": "2023-04-14T15:28:37+00:00"
}

Like in case of the record selector, properties of deeply nested objects can be removed as well by specifying the path of properties to the target field that should be removed.

Removing fields that match a glob pattern

Imagine that regardless of which level a properties appears, it should be removed from the data. This can be achieved by adding a ** to the "Path" - for example "**, name" will remove all "name" fields anywhere in the record:

{
"id": "00000000-0000-0000-0000-000000000000",
"status": "SENT",
"from": {
"email_address": "john.doe@gmail.com"
},
"subject": "Bar",
"created_at": "2023-04-13T15:28:37+00:00",
"sent_at": "2023-04-14T15:28:37+00:00"
}

The * character can also be used as a placeholder to filter for all fields that start with a certain prefix - the "Path" s* will remove all fields from the top level that start with the character s:

{
"id": "00000000-0000-0000-0000-000000000000",
"from": {
"email_address": "john.doe@gmail.com"
},
"created_at": "2023-04-13T15:28:37+00:00",
}

Adding fields

Adding fields can be used to apply a hashing function to an existing field to pseudonymize it. To do this, add a new transformation in the "Transformations" section of type "add" and enter the field path and the new value. For example in case of the EmailOctopus API, the campaigns records include the name of the sender:

{
"data": [
{
"id": "00000000-0000-0000-0000-000000000000",
"status": "SENT",
"name": "Foo",
"subject": "Bar",
"from": {
"name": "John Doe",
"email_address": "john.doe@gmail.com"
},
"created_at": "2023-04-13T15:28:37+00:00",
"sent_at": "2023-04-14T15:28:37+00:00"
},
]
}

To apply a hash function to it, set the "Path" to "from, name" to select the name property nested in the from object and set the value to {{ record['from']['name'] | hash('md5') }}. This hashes the name in the record:

{
"id": "00000000-0000-0000-0000-000000000000",
"status": "SENT",
"name": "Foo",
"subject": "Bar",
"from": {
"name": "4c2a904bafba06591225113ad17b5cec",
"email_address": "john.doe@gmail.com"
},
"created_at": "2023-04-13T15:28:37+00:00",
"sent_at": "2023-04-14T15:28:37+00:00"
}

Another common use case of the "add" transformation is the enriching of records with their parent resource - check out the partitioning documentation for more details.

It's not recommended to use this feature to do projections (like concatenating firstname and lastname into a single "name" field) - in most cases it's beneficial to leave these tasks to a later stage in the data pipeline.

Meta data

Besides bringing the records in the right shape, it's important to communicate some pieces of meta data about records to the downstream system so they can be handled properly.

Primary key

The "Primary key" field specifies how to uniquely identify a record. This is important for downstream de-duplication of records (e.g. by the incremental sync - Append + Deduped sync mode).

In a lot of cases, like for the EmailOctopus example from above, there is a dedicated id field that can be used for this purpose. It's important that the value of the id field is guaranteed to only occur once for a single record.

In some cases there is no such field but a combination of multiple fields is guaranteed to be unique, for example the shipping zone locations of the Woocommerce API do not have an id, but each combination of the code and type fields is guaranteed to be unique:

[
{
"code": "BR",
"type": "country"
},
{
"code": "DE",
"type": "country"
},
]

In this case, the "Primary key" can be set to "code, type" to allow automatic downstream deduplication of records based on the value of these two fields.

Declared schema

Similar to the "Primary key", the "Declared schema" defines how the records will be shaped via a JSON Schema definition. It defines which fields and nested fields occur in the records, whether they are always available or sometimes missing and which types they are.

This information is used by the Airbyte system for different purposes:

  • Column selection when configuring a connection - in Airbyte cloud, the declared schema allows the user to pick which columns/fields are passed to the destination to dynamically reduce the amount of synced data
  • Recreating the data structure with right columns in destination - this allows a warehouse destination to create a SQL table which the columns matching the fields of records
  • Detecting schema changes - if the schema of a stream changes for an existing connection, this situation can be handled gracefully by Airbyte instead of causing errors in the destination

When doing test reads, the connector builder analyzes the test records and shows the derived schema in the "Detected schema" tab. By default, new streams are configured to automatically import the detected schema into the declared schema on every test read. This behavior can be toggled off by disabling the Automatically import declared schema switch, in which case the declared schema can be manually edited in the UI and it will no longer be automatically updated when triggering test reads.

For example the following test records:

[
{
"id": "40205efe-5f94-11ed-aa11-7d1ac831a909",
"status": "SENT",
"subject": "Hello from Integration Test",
"created_at": "2022-11-08T18:36:25+00:00",
"sent_at": "2022-11-08T18:36:55+00:00"
},
{
"id": "91546616-5ef0-11ed-90c7-fbeacb2ee1eb",
"status": "SENT",
"subject": "Hello my first campaign",
"created_at": "2022-11-07T23:04:44+00:00",
"sent_at": "2022-11-08T12:48:27+00:00"
}
]

result in the following schema:

{
"$schema": "http://json-schema.org/schema#",
"properties": {
"created_at": {
"type": "string"
},
"id": {
"type": "string"
},
"sent_at": {
"type": "string"
},
"status": {
"type": "string"
},
"subject": {
"type": "string"
}
},
"type": "object"
}

More strict is always better, but the detected schema is a good default to rely on. See the documentation about supported data types for JSON schema structures that will be picked up by the Airbyte system.

If Automatically import detected schema is disabled, and the declared schema deviates from the detected schema, the "Detected schema" tab in the testing panel highlights the differences. It's important to note that differences are not necessarily a problem that needs to be fixed - in some cases the currently loaded set of records in the testing panel doesn't feature all possible cases so the detected schema is too strict. However, if the declared schema is incompatible with the detected schema based on the test records, it's very likely there will be errors when running syncs.

Detected schema with highlighted differences

In the case of the example above, there are two differences between detected and declared schema. The first difference for the name field is not problematic:

     "name": {
- "type": [
- "string",
- "null"
- ]
+ "type": "null"
},

The declared schema allows the null value for the name while the detected schema only encountered strings. If it's possible the name is set to null, the detected schema is configured correctly.

The second difference will likely cause problems:

     "subject": {
- "type": "number"
+ "type": "string"
}

The subject field was detected as string, but is configured to be a number in the declared schema. As the API returned string subjects during testing, it's likely this will also happen during syncs which would render the declared schema inaccurate. Depending on the situation this can be fixed in multiple ways:

  • If the API changed and subject is always a string now, the declared schema should be updated to reflect this: "subject": { "type": "string" }
  • If the API is sometimes returning subject as number of string depending on the record, the declared schema should be updated to allow both data types: "subject": { "type": ["string","number"] }

A common situation is that certain record fields do not have any any values for the test read data, so they are set to null. In the detected schema, these field are of type "null" which is most likely not correct for all cases. In these situations, the declared schema should be manually corrected.