IoT Data Validation Through Credits Blockchain Platform
1 Introduction
Innovative technology IOT is changing the world of modern business perception. The ability to collect real-time data provides businesses with a number of benefits, allowing to automate processes, increase productivity and improve customer service. However, one of the most difficult tasks for organizations using the Internet of Things is to provide a proper level of security. The solution is a new model of digital interactions, including blockchain technology and IOT. At the same time, the integration of this bunch of technologies requires high transaction processing speed.
Thereby, the presented model, which involves IBM Watson infrastucture, IBM clouds and Credits blockchain platform, is the most perfect way to solve existing problems. The presented prototype is the best approach to bring a hack-proof of IoT indications in the public view along with the speed, which is required for registration of the huge number of indications into a public blockchain. Thanks to the efficiency, speed and capacity of the Credits platform it can be performed now. Developed model is applicable for industries/market segments where high quality data collection from sensors/gateways and reaching consensus is vital for the further transactions between market participants. This model opens a huge market of prospects, which allow to transform the modern world shape, proving the value of IOT and blockchain for business with their subsequent mass adoption.
The solution architecture is displayed on the figure below.
IoT devices transmit metrics on the IBM IoT platform with MQTT protocol. Received data are saved in IBM Storage (Cloudant) in special areas (buckets). With certain intervals, new buckets are created with the data being written into them. A hash is calculated for the filled buckets and then recorded to the blockchain.
You can find a unique hash for each bucket in the blockchain and compare it with the current one. In case if calculated hash and hash in a blockchain match, this indicates that the data is reliable and was not changed. The lack of the bucket or mismatch of the hash indicates that the data was changed and cannot be reliable.
Following solutions provided by IBM Cloud are used:
- IBM IoT Cloud
- IBM Watson IoT Platform
- NodeRed
- Cloudant
Raspberry Pi, which sends its CPU temperature, is used as IoT data source.
2 Platform Configuration to Process and Store IoT Data
You can deploy the platform to process IoT data with Starter Kit (boilerplate) available by the link https://console.bluemix.net/catalog/starters/internet-of-things-platform-starter. It includes SDK for Node.js, Cloudant and Internet of Things platform.
We will use free version to process the data.
To prepare the infrastructure, following fields have to be filled up:
- App name: creditscloud
- Host Name:
- Domain: eu-gb.mybluemix.net
- Choose a region/location to deploy in: London
- Choose an organization: sk@credits.com
- Choose a space: dev
- SDK for Node.js: Default
- Cloudant: Lite
- Internet of Things Platform
Once everything is filled, press the “Create” button and wait for the creation of necessary infrastructure.
2.1 Configuration of IBM Watson IoT Platform
Main configuration of the platform comes to device registration and configuration of data storage in Cloudant.
To get an access to more detailed instruction of device registration, follow the link: https://developer.ibm.com/recipes/tutorials/how-to-register-devices-in-ibm-iot-foundation/
Once device is registered you’ll need the following data to connect Raspberry Pi:
- Device Type
- Device ID
- Organization ID
- Authentication Token
Use of Authentication Token allows establishing a more secure connection of physical IoT device via MQTT protocol.
2.2 Configuration of NodeRed on Node.js
NodeRed is an OpenSource tool, which provides a convenient graphic interface to develop interaction rules for IoT devices. NodeRed is used as an intermediate to connect different types of devices with each other and\or cloud or DBMS. NodeRed operates on Node.JS platform and allows adjustment of necessary user settings via a browser.
NodeRed on cloud is available right after Starter Kit deployment.
By following the link https://creditscloud.eu-gb.mybluemix.net/ a window NodeRed flow editor will be opened.
The scheme below shows how the data from IBM IoT Platform about temperature is received and how the data is saved in NoSQL storage in Cloudant after its processing.
During data processing, the system compares the received data with the threshold and assign the message status as “safe” or “danger”.
2.3 Configuration of Cloudant storage
To access Cloudant storage, you will have to sign up in Service Credentials tab, which will be resulted in the generation of “login”, “password” and “URL”, with which you will get an access to the storage with REST API.
You have to configure Extensions to save data on Cloudant in separate structures.
The figure above shows the data being saved in DB with the name “measurement” with automatic daily creation of a new bucket to store metrics.
2.4 Configuration and connection of Raspberry Pi
Single-board Raspberry Pi 3 Model B with installed Raspbian OS is used as a source of data. Raspberry Pi can be used as both an independent source of IoT signals and IoT gateway, connecting all IoT devices, available in a Smart House.
Raspberry PI will send data about CPU temperature on MQTT protocol to IoT platform Watson every 5 seconds. Internet connection should be established and configured on the Single-board computer in advance.
You need to execute following commands on Raspberry Pi:
sudo apt-get update
sudo apt-get install nodered
Once NodeRed is installed you need to deploy it with the following command:
node-red-start
Right after NodeRed is deployed, you need to open the following link in the browser: http://{IP_address_raspberry}:1880/.
Then, open menu Menu – > Import -> Clipboard and copy json, available by the link below, to working space and press the “Deploy” button.
As soon as you completed these steps, Raspberry Pi will send the data about CPU temperature to Watson platform every 5 seconds.
Then, to process the data you have to register the device in Watson IoT Platform
in NodeRed window you should double-click the node “Event”
Select “Connection” “Registered”
Press the icon «Edit» in the field «Credentials»
Fill the fields with the values from p. 2.1:
- Organization
- Device Type
- Device ID
- Auth Token
and press the button “Add”, and then “Deploy”.
3 Integration with blockchain
Installation of the necessary modules.
$ sudo apt install python3-pip
$ pip3 install base58check.
The module is used to decode hashes in Base58 encoding
$ pip3 install ed25519
Encryption algorithm ed25519 is used to sign transactions
$ pip3 install colorful
$ pip3 install tqdm
These 2 modules are responsible for highlighting the text, displayed in the console with colour and also for displaying progress bar during processing of data, received from Cloudant
$ pip3 install thrift
Module for interaction with a node according to Thrift protocol
$ pip3 install cloudant
Module for connection with Cloudant storage.
3.1 Confirmation of connection with Cloudant
Below is a code, which allows confirmation whether the connection with Cloudant was successful or not:
from cloudant.client import Cloudant
from cloudant.error import CloudantException
from cloudant.result import Result, ResultByKey
client = Cloudant(“login”, “password”, url=”url”)
“login” “password” “URL” are the details, taken from p. 2.3. In case if there are no errors, encountered during execution of these commands, it means the connection was established.
3.2 Request of data from Cloudant
Request of data from cloudant occurs with the following code:
params = {'include_docs': 'true'}
end_point = '{0}/{1}/_all_docs'.format(cloudant_client.server_url, bucket_name)
response = cloudant_client.r_session.get(end_point, params=params)
The parameter “bucketname” must be the name of the requested bucket. This request will download all data of the requested bucket.
Cloudant specific feature is that each request can return the data in a random sequence, therefore it should be sorted (e.g. in ascending order) before the start of bucket’s hash calculation.
3.3 Smart Contract Deployment
The smart contract is used to store the pair “name:hash”. Its code is displayed below:
import java.util.HashMap;
import java.util.Map;
public class Contract extends SmartContract {
private String valueDetector = "";
private HashMap<String, String> hashesOfBlocks = new HashMap<>();
public String getValueDetector() {
return valueDetector;
}
public void setValueDetector(String valueDetector) {
this.valueDetector = valueDetector;
}
public void addBlock(String hash, String block){
hashesOfBlocks.put(hash, block);
}
public String getBlock(String hash){
return hashesOfBlocks.get(hash);
}
public String getAllBlocks(){
return hashesOfBlocks.toString();
}
}
The most convenient way to deploy smart contract is to upload it via the Wallet. For this purpose you need to click “ Smart Contracts” tab and then click on “Deploy new smart contract”
We recommend to copy smart contract code from a text editor (e.g. NotePad). To compile the source code, you should press “Build” button and then the code will be compiled in appropriate byte code. After pressing the “Deploy” button a Deploy-transaction will be formed, signed by the wallet EDS and sent to the p2p network then. Once the Deploy-transaction is validated, it is recorded to the blockchain.
Smart contract can be executed from the wallet by clicking on “Smart Contracts”, and then ”Execute”
3.4 Formation of a Transaction to call smart contract method
Transaction is formed within several steps:
- Filling smart contract fields in API Thrift format
- Serialization of USERFIELD for smart contract execution
- Filling all transaction fields except EDS
- Serialization of a transaction by using API Thrift
- Combination of serialized transaction and USERFIELD
- Adding EDS to the transaction
The following code is responsible for calling smart contract method with parameters “params”:
# SmartContractInvocation (USERFIELDS)
tr.smartContract = SmartContractInvocation()
tr.smartContract.method = method
tr.smartContract.params = params
tr.smartContract.forgetNewState = False
The code below executes serialization of a smart contract:
# Serialize SmartContractInvocation() by Thrift
transportOut = TTransport.TMemoryBuffer()
protocolOut = TBinaryProtocol.TBinaryProtocol(transportOut)
tr.smartContract.write(protocolOut)
serial_smart = transportOut.getvalue()
Python code below displays the structure of a transaction:
serial_transaction = pack('=6s32s32slqhbbi', #'=' - without alignment
lastInnerId, #6s - 6 byte InnerID (char[] C Type)
tr.source, #32s - 32 byte source public key (char[] C Type)
tr.target, #32s - 32 byte target pyblic key (char[] C Type)
tr.amount.integral, #i - 4 byte integer(int C Type)
tr.amount.fraction, #q - 8 byte integer(long long C Type)
tr.fee.commission, #h - 2 byte integer (short C Type)
tr.currency, #b - 1 byte integer (signed char C Type)
1, #b - 1 byte userfield_num
len(serial_smart)) #i - 4 byte userfield_len
Then the code’s serialization occurs:
# Add serialized SmartContractInvocation
full_serial_transaction = serial_transaction + serial_smart
# Calculate signing
keydata = base58check.b58decode(walletpriv)
# Create object ed25519
signing_key = ed25519.SigningKey(keydata)
# Receive digital signature msg
sign = signing_key.sign(full_serial_transaction)
# Writing EDS to the corresponding transaction field
tr.signature = sign
Use “Flow()” function to start smart contract execution
res = api.TransactionFlow(tr)
3.5 Recording the Data to the Blockchain
A script to add the data to the blockchain is launched on schedule. The launch period should match with the time period for storing the data in a single bucket. For instance, if a new bucket is created once an hour, then the script should be launched once an hour as well.
Add the pair “name:hash” to call a smart contract function “AddBlock”: a special transaction for execution is formed (all necessary fields of API Thrift are filled), it is signed and sent to Executor.
3.6 Blockchain Validator
The code, provided in appendix, operates according to the following algorithm:
- Establishing the connection with the blockchain via Thrift API
- Extracting all pairs “name:hash” from the blockchain
- Receiving corresponding buckets from Cloudant by using “name” as a key, calculating their hashes and comparing them with the values from the blockchain
- In case if hashes match, “OK” is displayed. If they don’t match, it means the data were tampered with and a warning “MODIFIED” is displayed. If the corresponding bucket was not found in Cloudant, an error “REMOVED” is displayed, indicating that the data was deleted.
3.7 Solution Testing
To test this solution you will need to sign up in Cloudant and change the value of a parameter in any bucket. When you will start validation process, a window displayed below will be opened:
The first column is a bucket number, the second one is a bucket hash, stored in the blockchain. A warning “MODIFIED” is displayed for the buckets, which hashes don’t match with those, which are stored in the blockchain. If the corresponding bucket wasn’t found in the storage, an error “REMOVED” is displayed.
Appendix 1. Full code of the validator
#!/usr/bin/env python
import sys
import glob
import base58check
import json
import hashlib
from datetime import datetime, date, time, timedelta
import time
import ed25519 #Signing transaction
from struct import * #Serialize struct
import collections
import colorful
from tqdm import tqdm
sys.path.append('gen-py')
from NodeApi import API
from executor import ContractExecutor
from general.ttypes import Variant
from NodeApi.ttypes import Transaction, TransactionFlowResult, Amount, AmountCommission, SmartContractInvocation, Pool, PoolListGetResult, StatsGetResult, PeriodStats
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from cloudant.client import Cloudant
from cloudant.error import CloudantException
from cloudant.result import Result, ResultByKey
from requests.adapters import HTTPAdapter
walletpriv='Wallet private key'
walletpub='Wallet public key'
smartpub='Smart contract public key'
def cloudant_setup():
httpAdapter = HTTPAdapter(pool_connections=15, pool_maxsize=100)
client = Cloudant("Cloudant_login",
"Cloudant_password",
url="Cloudant_URL",
connect=True,
auto_renew=True,
timeout=300,
adapter=httpAdapter)
return client
def smart_exec(api, wallet_id, smart_id, method, params):
# Get last Inner ID
wallet = api.WalletTransactionsCountGet(base58check.b58decode(wallet_id))
lastInnerId = bytearray((wallet.lastTransactionInnerId + 1).to_bytes(6,'little'))
# Get smartcontract bytecode
smart = api.SmartContractGet(base58check.b58decode(smart_id))
tr = Transaction()
tr.id = int.from_bytes(lastInnerId,byteorder='little', signed=False)
tr.source = base58check.b58decode(wallet_id)
tr.target = base58check.b58decode(smart_id)
# Amount
tr.amount = Amount()
tr.amount.integral = 0
tr.amount.fraction = 0
# Balance
tr.balance = Amount()
tr.balance.integral = 0
tr.balance.fraction = 0
tr.currency = 1
# Fee
tr.fee = AmountCommission()
tr.fee.commission = 0
# SmartContractInvocation (USERFIELDS)
tr.smartContract = SmartContractInvocation()
tr.smartContract.method = method
tr.smartContract.params = params
tr.smartContract.forgetNewState = False
# Serialize SmartContractInvocation() by Thrift
transportOut = TTransport.TMemoryBuffer()
protocolOut = TBinaryProtocol.TBinaryProtocol(transportOut)
tr.smartContract.write(protocolOut)
serial_smart = transportOut.getvalue()
serial_transaction = pack('=6s32s32slqhbbi', #'=' - without alignment
lastInnerId, #6s - 6 byte InnerID (char[] C Type)
tr.source, #32s - 32 byte source public key (char[] C Type)
tr.target, #32s - 32 byte target pyblic key (char[] C Type)
tr.amount.integral, #i - 4 byte integer(int C Type)
tr.amount.fraction, #q - 8 byte integer(long long C Type)
tr.fee.commission, #h - 2 byte integer (short C Type)
tr.currency, #b - 1 byte integer (signed char C Type)
1, #b - 1 byte userfield_num
len(serial_smart)) #i - 4 byte userfield_len
# Add serialized SmartContractInvocation
full_serial_transaction = serial_transaction + serial_smart
# Calculate signing
keydata = base58check.b58decode(walletpriv)
# Create object ed25519
signing_key = ed25519.SigningKey(keydata)
# Receive digital signature msg
sign = signing_key.sign(full_serial_transaction)
tr.signature = sign
res = api.TransactionFlow(tr)
def node_setup():
# Make socket
transport = TSocket.TSocket('IP_CLIENT', 9090)
transport_exec = TSocket.TSocket('IP_EXECUTOR', 9080)
# Buffering is critical. Raw sockets are very slow
transport = TTransport.TBufferedTransport(transport)
transport_exec = TTransport.TBufferedTransport(transport_exec)
# Wrap in a protocol'
protocol = TBinaryProtocol.TBinaryProtocol(transport)
protocol_exec = TBinaryProtocol.TBinaryProtocol(transport_exec)
# Create a client to use the protocol encoder
client = API.Client(protocol)
client_exec = ContractExecutor.Client(protocol_exec) #running get method
# Connect!
transport.open()
transport_exec.open()
return client, client_exec
def main():
# setup all clients: api, executor, cloudant
client, client_exec = node_setup()
cloudant_client = cloudant_setup()
# get all storage block from cloudant
listStorageName = []
for k in cloudant_client.all_dbs():
if 'iotp_hmoyaf_measurements_2' in str(k):
listStorageName.append(k)
curr_day = datetime.strftime(datetime.now(), '%Y-%m-%d')
next_day = datetime.strftime(datetime.now() + timedelta(days=1), '%Y-%m-%d')
# not compare hash for cur_day and next_day
listStorageName.remove('iotp_hmoyaf_measurements_'+curr_day)
listStorageName.remove('iotp_hmoyaf_measurements_'+next_day)
# Run addBlock method for SmartContract
params = {'include_docs': 'true'}
# mparams = [Variant(),Variant()]
# method = 'addBlock'
dictCloud = {}
for k in tqdm(listStorageName):
start = datetime.now()
end_point = '{0}/{1}/_all_docs'.format(cloudant_client.server_url, k)
response = cloudant_client.r_session.get(end_point, params=params)
a = json.loads(response.text)
sha1 = hashlib.sha1()
sha1.update(json.dumps(a['rows']).encode())
sig = sha1.digest().hex()
dictCloud.update({k:sig})
period = datetime.now() - start
tqdm.write(f"{a['total_rows']} docs in {k} storage with {sig} hash in {period}")
time.sleep(1)
# Obtain all pair from smart_contract
#
# Run Smart via executor
wallet = client.WalletTransactionsCountGet(base58check.b58decode(walletpub))
# Get bytecode from smartcontract
smarttmp = client.SmartContractGet(base58check.b58decode(smartpub))
# Execute smartcontract method
res = client_exec.executeByteCode(base58check.b58decode(smartpub),
smarttmp.smartContract.smartContractDeploy.byteCode,
smarttmp.smartContract.objectState,
'getAllBlocks',
[],
10000)
# remove first and last " from string
string = res.ret_val.v_string[1:-1]
# parse string to Dict (block_name: hash)
dictChain = dict(x.split('=') for x in string.split(', '))
# print head of table
print(f"\n{colorful.bold_white}{'BLOCK NAME':^40} {'HASH':^40} " + f"{colorful.bold_white}{'STATUS':<10}\n")
# compare hash from smart and from cloudant
for key,val in collections.OrderedDict(sorted(dictChain.items())).items():
try:
if (dictChain[key] != dictCloud[key]):
print(f"{colorful.white}{key:<40}: {val:<40} - " + f"{colorful.yellow}{'MODIFIED':<10}")
else:
print(f"{colorful.white}{key:<40}: {colorful.italic}{val:<40} - " + f"{colorful.green}{'OK':<10}")
except KeyError:
print(f"{colorful.white}{key:<40}: {val:<40} - "+ f"{colorful.red}{'REMOVED':<10}")
print(f'{colorful.white}.')
# Close!
# transport.close()
# transport_exec.close()
if __name__ == '__main__':
try:
main()
except Thrift.TException as tx:
print('%s' % tx.message)