12 March 2019

IoT Data Validation Through Credits Blockchain Platform

 

1 Introduction

Innovative technology IOT is changing the world of modern business perception. The ability to collect real-time data provides businesses with a number of benefits, allowing to automate processes, increase productivity and improve customer service. However, one of the most difficult tasks for organizations using the Internet of Things is to provide a proper level of security. The solution is a new model of digital interactions, including blockchain technology and IOT. At the same time, the integration of this bunch of technologies requires high transaction processing speed.

Thereby, the presented model, which involves IBM Watson infrastucture, IBM clouds and Credits blockchain platform, is the most perfect way to solve existing problems. The presented prototype is the best approach to bring a hack-proof of IoT indications in the public view along with the speed, which is required for registration of the huge number of indications into a public blockchain. Thanks to the efficiency, speed and capacity of the Credits platform it can be performed now. Developed model is applicable for industries/market segments where high quality data collection from sensors/gateways and reaching consensus is vital for the further transactions between market participants. This model opens a huge market of prospects, which allow to transform the modern world shape, proving the value of IOT and blockchain for business with their subsequent mass adoption.

The solution architecture is displayed on the figure below.

IoT devices transmit metrics on the IBM IoT platform with MQTT protocol. Received data are saved in IBM Storage (Cloudant) in special areas (buckets). With certain intervals, new buckets are created with the data being written into them. A hash is calculated for the filled buckets and then recorded to the blockchain.

You can find a unique hash for each bucket in the blockchain and compare it with the current one. In case if calculated hash and hash in a blockchain match, this indicates that the data is reliable and was not changed. The lack of the bucket or mismatch of the hash indicates that the data was changed and cannot be reliable.

Following solutions provided by IBM Cloud are used:

  • IBM IoT Cloud
  • IBM Watson IoT Platform
  • NodeRed
  • Cloudant

Raspberry Pi, which sends its CPU temperature, is used as IoT data source.

2 Platform Configuration to Process and Store IoT Data

You can deploy the platform to process IoT data with Starter Kit (boilerplate) available by the link https://console.bluemix.net/catalog/starters/internet-of-things-platform-starter. It includes SDK for Node.js, Cloudant and Internet of Things platform.

We will use free version to process the data.

To prepare the infrastructure, following fields have to be filled up:

  • App name: creditscloud
  • Host Name:
  • Domain: eu-gb.mybluemix.net
  • Choose a region/location to deploy in: London
  • Choose an organization: sk@credits.com
  • Choose a space: dev
  • SDK for Node.js: Default
  • Cloudant: Lite
  • Internet of Things Platform

Once everything is filled, press the “Create” button and wait for the creation of necessary infrastructure.

2.1 Configuration of IBM Watson IoT Platform

Main configuration of the platform comes to device registration and configuration of data storage in Cloudant.

To get an access to more detailed instruction of device registration, follow the link: https://developer.ibm.com/recipes/tutorials/how-to-register-devices-in-ibm-iot-foundation/

Once device is registered you’ll need the following data to connect Raspberry Pi:

  • Device Type
  • Device ID
  • Organization ID
  • Authentication Token

Use of Authentication Token allows establishing a more secure connection of physical IoT device via MQTT protocol.

2.2 Configuration of NodeRed on Node.js

NodeRed is an OpenSource tool, which provides a convenient graphic interface to develop interaction rules for IoT devices. NodeRed is used as an intermediate to connect different types of devices with each other and\or cloud or DBMS. NodeRed operates on Node.JS platform and allows adjustment of necessary user settings via a browser.

NodeRed on cloud is available right after Starter Kit deployment.

By following the link https://creditscloud.eu-gb.mybluemix.net/ a window NodeRed flow editor will be opened.

The scheme below shows how the data from IBM IoT Platform about temperature is received and how the data is saved in NoSQL storage in Cloudant after its processing.

During data processing, the system compares the received data with the threshold and assign the message status as “safe” or “danger”.

2.3 Configuration of Cloudant storage

To access Cloudant storage, you will have to sign up in Service Credentials tab, which will be resulted in the generation of “login”, “password” and “URL”, with which you will get an access to the storage with REST API.

You have to configure Extensions to save data on Cloudant in separate structures.

The figure above shows the data being saved in DB with the name “measurement” with automatic daily creation of a new bucket to store metrics.

2.4 Configuration and connection of Raspberry Pi

Single-board Raspberry Pi 3 Model B with installed Raspbian OS is used as a source of data. Raspberry Pi can be used as both an independent source of IoT signals and IoT gateway, connecting all IoT devices, available in a Smart House.

Raspberry PI will send data about CPU temperature on MQTT protocol to IoT platform Watson every 5 seconds. Internet connection should be established and configured on the Single-board computer in advance.

You need to execute following commands on Raspberry Pi:

sudo apt-get update

sudo apt-get install nodered

Once NodeRed is installed you need to deploy it with the following command:

node-red-start

Right after NodeRed is deployed, you need to open the following link in the browser: http://{IP_address_raspberry}:1880/.

Then, open menu Menu – > Import -> Clipboard and copy json, available by the link below, to working space and press the “Deploy” button.

As soon as you completed these steps, Raspberry Pi will send the data about CPU temperature to Watson platform every 5 seconds.

Then, to process the data you have to register the device in Watson IoT Platform

in NodeRed window you should double-click the node “Event”

 

Select  “Connection” “Registered”

Press the icon «Edit» in the field «Credentials»

Fill the fields with the values from p. 2.1:

  • Organization
  • Device Type
  • Device ID
  • Auth Token

and press the button “Add”, and then “Deploy”.

3 Integration with blockchain

Installation of the necessary modules.

$ sudo apt install python3-pip

$ pip3 install base58check.

The module is used to decode hashes in Base58 encoding

$ pip3 install ed25519

Encryption algorithm ed25519 is used to sign transactions

$ pip3 install colorful

$ pip3 install tqdm

These 2 modules are responsible for highlighting the text, displayed in the console with colour and also for displaying progress bar during processing of data, received from Cloudant

$ pip3 install thrift

Module for interaction with a node according to Thrift protocol

$ pip3 install cloudant

Module for connection with Cloudant storage.

3.1 Confirmation of connection with Cloudant

Below is a code, which allows confirmation whether the connection with Cloudant was successful or not:

from cloudant.client import Cloudant

from cloudant.error import CloudantException

from cloudant.result import Result, ResultByKey

client = Cloudant(“login”, “password”, url=”url”)

“login” “password” “URL”  are the details, taken from p. 2.3.  In case if there are no errors, encountered during execution of these commands, it means the connection was established.

3.2 Request of data from Cloudant

Request of data from cloudant occurs with the following code:

params = {'include_docs': 'true'}

end_point = '{0}/{1}/_all_docs'.format(cloudant_client.server_url, bucket_name)

response = cloudant_client.r_session.get(end_point, params=params)

The parameter “bucketname” must be the name of the requested bucket. This request will download all data of the requested bucket.

Cloudant specific feature is that each request can return the data in a random sequence, therefore it should be sorted (e.g. in ascending order) before the start of bucket’s hash calculation.

3.3 Smart Contract Deployment

The smart contract is used to store the pair “name:hash”. Its code is displayed below:

import java.util.HashMap;

import java.util.Map; 

public class Contract extends SmartContract {

 

   private String valueDetector = "";

   private HashMap<String, String> hashesOfBlocks = new HashMap<>();

 

   public String getValueDetector() {

       return valueDetector;

   }

 

   public void setValueDetector(String valueDetector) {

       this.valueDetector = valueDetector;

   }

 

   public void addBlock(String hash, String block){

       hashesOfBlocks.put(hash, block);

   }

 

   public String getBlock(String hash){

      return hashesOfBlocks.get(hash);

   }

 

   public String getAllBlocks(){

       return hashesOfBlocks.toString();

   }

}

 

The most convenient way to deploy smart contract is to upload it via the Wallet. For this purpose you need to click “ Smart Contracts” tab and then click on “Deploy new smart contract”

We recommend to copy smart contract code from a text editor (e.g. NotePad). To compile the source code, you should press “Build” button and then the code will be compiled in appropriate byte code. After pressing the “Deploy” button a Deploy-transaction will be formed, signed by the wallet EDS and sent to the p2p network then. Once the Deploy-transaction is validated, it is recorded to the blockchain.

Smart contract can be executed from the wallet by clicking on “Smart Contracts”, and then ”Execute”

3.4 Formation of a Transaction to call smart contract method

Transaction is formed within several steps:

  1. Filling smart contract fields in API Thrift format
  2. Serialization of USERFIELD for smart contract execution
  3. Filling all transaction fields except EDS
  4. Serialization of a transaction by using API Thrift
  5. Combination of serialized transaction and USERFIELD
  6. Adding EDS to the transaction

 

The following code is responsible for calling smart contract method with parameters “params”:

   # SmartContractInvocation (USERFIELDS)

   tr.smartContract = SmartContractInvocation()

   tr.smartContract.method = method

   tr.smartContract.params = params

   tr.smartContract.forgetNewState = False

 

The code below executes serialization of a smart contract:

   # Serialize SmartContractInvocation() by Thrift

   transportOut = TTransport.TMemoryBuffer()

   protocolOut = TBinaryProtocol.TBinaryProtocol(transportOut)

   tr.smartContract.write(protocolOut)

   serial_smart = transportOut.getvalue()

 

Python code below displays the structure of a transaction:

serial_transaction = pack('=6s32s32slqhbbi',  #'=' - without alignment

   lastInnerId,     #6s - 6 byte InnerID (char[] C Type)

   tr.source,       #32s - 32 byte source public key (char[] C Type)

   tr.target,       #32s - 32 byte target pyblic key (char[] C Type)

   tr.amount.integral, #i - 4 byte integer(int C Type)

   tr.amount.fraction, #q - 8 byte integer(long long C Type)

   tr.fee.commission,  #h - 2 byte integer (short C Type)

   tr.currency,        #b - 1 byte integer (signed char C Type)

   1,                #b - 1 byte userfield_num

   len(serial_smart))  #i - 4 byte userfield_len

 

Then the code’s serialization occurs:

# Add serialized SmartContractInvocation

full_serial_transaction = serial_transaction + serial_smart

# Calculate signing

keydata = base58check.b58decode(walletpriv)

# Create object ed25519

signing_key = ed25519.SigningKey(keydata)

# Receive digital signature msg

sign = signing_key.sign(full_serial_transaction)

# Writing EDS to the corresponding transaction field

tr.signature = sign

 

Use “Flow()” function to start smart contract execution

res = api.TransactionFlow(tr)

3.5 Recording the Data to the Blockchain

A script to add the data to the blockchain is launched on schedule. The launch period should match with the time period for storing the data in a single bucket. For instance, if a new bucket is created once an hour, then the script should be launched once an hour as well.

Add the pair “name:hash” to call a smart contract function “AddBlock”: a special transaction for execution is formed (all necessary fields of API Thrift are filled), it is signed and sent to Executor.

3.6 Blockchain Validator

The code, provided in appendix, operates according to the following algorithm:

  1. Establishing the connection with the blockchain via Thrift API
  2. Extracting all pairs “name:hash” from the blockchain
  3. Receiving corresponding buckets from Cloudant by using “name” as a key, calculating their hashes and comparing them with the values from the blockchain
  4. In case if hashes match, “OK” is displayed. If they don’t match, it means the data were tampered with and a warning “MODIFIED” is displayed.  If the corresponding bucket was not found in Cloudant, an error “REMOVED” is displayed, indicating that the data was deleted.

3.7 Solution Testing

To test this solution you will need to sign up in Cloudant and change the value of a parameter in any bucket. When you will start validation process, a window displayed below will be opened:

The first column is a bucket number, the second one is a bucket hash, stored in the blockchain. A warning “MODIFIED” is displayed for the buckets, which hashes don’t match with those, which are stored in the blockchain. If the corresponding bucket wasn’t found in the storage, an error “REMOVED” is displayed.

 

Appendix 1. Full code of the validator

#!/usr/bin/env python 

import sys

import glob

import base58check

import json

import hashlib

from datetime import datetime, date, time, timedelta

import time

import ed25519          #Signing transaction

from struct import *    #Serialize struct

import collections

import colorful

from tqdm import tqdm

 

sys.path.append('gen-py')

 

from NodeApi import API

from executor import ContractExecutor

from general.ttypes import Variant

from NodeApi.ttypes import Transaction, TransactionFlowResult, Amount, AmountCommission, SmartContractInvocation, Pool, PoolListGetResult, StatsGetResult, PeriodStats

 

from thrift import Thrift

from thrift.transport import TSocket

from thrift.transport import TTransport

from thrift.protocol import TBinaryProtocol

 

from cloudant.client import Cloudant

from cloudant.error import CloudantException

from cloudant.result import Result, ResultByKey

from requests.adapters import HTTPAdapter



walletpriv='Wallet private key'

walletpub='Wallet public key'

smartpub='Smart contract public key'



def cloudant_setup():

   httpAdapter = HTTPAdapter(pool_connections=15, pool_maxsize=100)

   client = Cloudant("Cloudant_login",

                 "Cloudant_password",

                 url="Cloudant_URL",

                 connect=True,

                 auto_renew=True,

                 timeout=300,

                 adapter=httpAdapter)

   return client

 

def smart_exec(api, wallet_id, smart_id, method, params):

   # Get last Inner ID

   wallet = api.WalletTransactionsCountGet(base58check.b58decode(wallet_id))

   lastInnerId = bytearray((wallet.lastTransactionInnerId + 1).to_bytes(6,'little'))

   # Get smartcontract bytecode

   smart = api.SmartContractGet(base58check.b58decode(smart_id))

 

   tr = Transaction()

   tr.id = int.from_bytes(lastInnerId,byteorder='little', signed=False)

   tr.source = base58check.b58decode(wallet_id)

   tr.target = base58check.b58decode(smart_id)

 

   # Amount

   tr.amount = Amount()

   tr.amount.integral = 0

   tr.amount.fraction = 0

   # Balance

   tr.balance = Amount()

   tr.balance.integral = 0

   tr.balance.fraction = 0

   tr.currency = 1

   # Fee

   tr.fee = AmountCommission()

   tr.fee.commission = 0

   # SmartContractInvocation (USERFIELDS)

   tr.smartContract = SmartContractInvocation()

   tr.smartContract.method = method

   tr.smartContract.params = params

   tr.smartContract.forgetNewState = False

 

   # Serialize SmartContractInvocation() by Thrift

   transportOut = TTransport.TMemoryBuffer()

   protocolOut = TBinaryProtocol.TBinaryProtocol(transportOut)

   tr.smartContract.write(protocolOut)

   serial_smart = transportOut.getvalue()

 

   serial_transaction = pack('=6s32s32slqhbbi',  #'=' - without alignment

                      lastInnerId, #6s - 6 byte InnerID (char[] C Type)

                      tr.source, #32s - 32 byte source public key (char[] C Type)

                      tr.target, #32s - 32 byte target pyblic key (char[] C Type)

                      tr.amount.integral, #i - 4 byte integer(int C Type)

                      tr.amount.fraction, #q - 8 byte integer(long long C Type)

                      tr.fee.commission, #h - 2 byte integer (short C Type)

                      tr.currency, #b - 1 byte integer (signed char C Type)

                      1, #b - 1 byte userfield_num

                      len(serial_smart)) #i - 4 byte userfield_len

 

   # Add serialized SmartContractInvocation

   full_serial_transaction = serial_transaction + serial_smart

   # Calculate signing

   keydata = base58check.b58decode(walletpriv)

   # Create object ed25519

   signing_key = ed25519.SigningKey(keydata)

   # Receive digital signature msg

   sign = signing_key.sign(full_serial_transaction)

   tr.signature = sign

 

   res = api.TransactionFlow(tr)

 

def node_setup():

  # Make socket

   transport = TSocket.TSocket('IP_CLIENT', 9090)

   transport_exec = TSocket.TSocket('IP_EXECUTOR', 9080)

 

   # Buffering is critical. Raw sockets are very slow

   transport = TTransport.TBufferedTransport(transport)

   transport_exec = TTransport.TBufferedTransport(transport_exec)

   # Wrap in a protocol'

   protocol = TBinaryProtocol.TBinaryProtocol(transport)

   protocol_exec = TBinaryProtocol.TBinaryProtocol(transport_exec)

 

   # Create a client to use the protocol encoder

   client = API.Client(protocol)

   client_exec = ContractExecutor.Client(protocol_exec) #running get method

 

   # Connect!

   transport.open()

   transport_exec.open()

 

   return client, client_exec



def main():

   # setup all clients: api, executor, cloudant

   client, client_exec = node_setup()

   cloudant_client = cloudant_setup()

 

   # get all storage block from cloudant

   listStorageName = []

   for k in cloudant_client.all_dbs():

       if 'iotp_hmoyaf_measurements_2' in str(k):

           listStorageName.append(k)

 

   curr_day = datetime.strftime(datetime.now(), '%Y-%m-%d')

   next_day = datetime.strftime(datetime.now() + timedelta(days=1), '%Y-%m-%d')

 

   # not compare hash for cur_day and next_day

   listStorageName.remove('iotp_hmoyaf_measurements_'+curr_day)

   listStorageName.remove('iotp_hmoyaf_measurements_'+next_day)

 

   # Run addBlock method for SmartContract

   params = {'include_docs': 'true'}

   # mparams = [Variant(),Variant()]

   # method = 'addBlock'

   dictCloud = {}

   for k in tqdm(listStorageName):

       start = datetime.now()

       end_point = '{0}/{1}/_all_docs'.format(cloudant_client.server_url, k)

       response = cloudant_client.r_session.get(end_point, params=params)

       a = json.loads(response.text)

       sha1 = hashlib.sha1()

       sha1.update(json.dumps(a['rows']).encode())

       sig = sha1.digest().hex()

       dictCloud.update({k:sig})

       period = datetime.now() - start

       tqdm.write(f"{a['total_rows']} docs in {k} storage with {sig} hash in {period}")

       time.sleep(1)




#  Obtain all pair from smart_contract

#

   # Run Smart via executor

   wallet = client.WalletTransactionsCountGet(base58check.b58decode(walletpub))

 

   # Get bytecode from  smartcontract

   smarttmp = client.SmartContractGet(base58check.b58decode(smartpub))

 

   # Execute smartcontract method

   res = client_exec.executeByteCode(base58check.b58decode(smartpub),

                                     smarttmp.smartContract.smartContractDeploy.byteCode,

                                     smarttmp.smartContract.objectState,

                                     'getAllBlocks',

                                     [],

                                     10000)

 

   # remove first and last " from string

   string = res.ret_val.v_string[1:-1]

 

   # parse string to Dict (block_name: hash)

   dictChain = dict(x.split('=') for x in string.split(', '))

 

   # print head of table

   print(f"\n{colorful.bold_white}{'BLOCK NAME':^40}  {'HASH':^40} " + f"{colorful.bold_white}{'STATUS':<10}\n")

 

   # compare hash from smart and from cloudant

   for key,val in collections.OrderedDict(sorted(dictChain.items())).items():

       try:

           if (dictChain[key] != dictCloud[key]):

               print(f"{colorful.white}{key:<40}: {val:<40} - " + f"{colorful.yellow}{'MODIFIED':<10}")

           else:

               print(f"{colorful.white}{key:<40}: {colorful.italic}{val:<40} - " + f"{colorful.green}{'OK':<10}")

       except KeyError:

           print(f"{colorful.white}{key:<40}: {val:<40} - "+ f"{colorful.red}{'REMOVED':<10}")

   print(f'{colorful.white}.')

 

# Close!

#    transport.close()

#    transport_exec.close()



if __name__ == '__main__':

   try:

       main()

   except Thrift.TException as tx:

       print('%s' % tx.message)

;