nodejs 操作 es

Nodejs Elasticsearch教程示例是今天的主要主题。 Elasticsearch是一个开源搜索引擎,由于其高性能和分布式架构而变得非常流行。 Elasticsearch构建于Apache Lucene之上,后者是一个高性能的文本搜索引擎库。 虽然Elasticsearch可以执行数据的存储和检索,但其主要目的不是用作数据库。 相反,它是一个搜索引擎(服务器),其主要目标是索引,搜索和提供有关数据的实时统计信息。

在本教程中,我们将与Node.js集成并使用它来索引和搜索数据。 导入数据后,它立即可用于搜索。 Elasticsearch是schema-less,可以将数据存储在JSON文档中,并且可以自动检测数据结构及其类型。

Elasticsearch也完全由API驱动。 这意味着几乎所有操作都可以通过使用HTTP上的JSON数据的简单RESTful API完成。 它有几乎所有编程语言的客户端库,包括JavaScript。 在此示例中,我们将使用官方客户端库。

准备工作

安装Elasticsearch

请按照文章“如何在Linux,MacOS及Windows上进行安装Elasticsearch”进行安装Elasticsearch。针对Mac用户,你也可以使用如下的命令来安装Elasticsearch:

$ brew install elasticsearch

运行Elasticsearch

我们可以在Mac上用如下的方式运行Elasticsearch:

$ brew services start elasticsearch

连接Elasticsearch

我们在我们的电脑上创建一个目录,比如说:

$ mkdir node-elastic
$ cd node-elastic

在这个目录中,我们可以通过如下的命令来创建我们的nodejs项目的package.json文件。

$ npm init

我通过选默认的选项

{
  "name": "elastic",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "start": "node index.js"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
  }

这里,我对scripts部分做了一点小的修改。这样我们就可以通过npm start来运行我们的index.js文件。

就像上面显示的那样,我们来创建一个叫做index.js的文件。它的内容如下:

// index.js

const express = require('express');
const elasticsearch = require('elasticsearch');
const fs = require('fs');
const app = express();

const PORT = 5000;

const client = new elasticsearch.Client({
    host: '127.0.0.1:9200',
    log: 'error'
 });

app.listen(PORT, function() {
    console.log('Server is running on PORT:',PORT);
});

在我们的上面的代码中我们需要express及elasticsearch两个模块。我们需要通过的如下的方法来进行安装:

$ npm install express --save
$ npm install elasticsearch --save

通过 –save选项,我们把我们需要的模块存于到我们当前的目录中。

我们可以通过如下的方法来运行我们的应用:

$ node index.js

或者:

$ npm start

如果我们的index.js运行成功的话,它将在PORT:5000口地址上。而我们的Elasticsearch服务器运行在PORT:9200上。显示的结果如下:

Server is running on PORT: 5000

nodejs操作elasticsearch

创建一个样本数据

我们需要创建一个样本数据。这个数据将被读入进来并存于Elasticsearch之中。一旦数据被录入到Elasticsearch中,我们就可以对它们进行搜索。我们在项目的根目录中创建一个叫做data.json的文件,并把如下的内容输入进去:

[
  {
    "id": "2",
    "user" : "双榆树-张三",
    "message" : "今儿天气不错啊,出去转转去",
    "uid" : 2,
    "age" : 20,
    "city" : "北京",
    "province" : "北京",
    "country" : "中国",
    "address" : "中国北京市海淀区",
    "location" : {
        "lat" : "39.970718",
        "lon" : "116.325747"
      }
  },
  {
    "id": "3",
    "user" : "东城区-老刘",
    "message" : "出发,下一站云南!",
    "uid" : 3,
    "age" : 30,
    "city" : "北京",
    "province" : "北京",
    "country" : "中国",
    "address" : "中国北京市东城区台基厂三条3号",
    "location" : {
      "lat" : "39.904313",
      "lon" : "116.412754"
    }
  },
  {
    "id": "4",
    "user" : "东城区-李四",
    "message" : "happy birthday!",
    "uid" : 4,
    "age" : 30,
    "city" : "北京",
    "province" : "北京",
    "country" : "中国",
    "address" : "中国北京市东城区",
    "location" : {
      "lat" : "39.893801",
      "lon" : "116.408986"
    }
  },
  {
    "id": "7",
    "user" : "虹桥-老吴",
    "message" : "好友来了都今天我生日,好友来了,什么 birthday happy 就成!",
    "uid" : 7,
    "age" : 90,
    "city" : "上海",
    "province" : "上海",
    "country" : "中国",
    "address" : "中国上海市闵行区",
    "location" : {
      "lat" : "31.175927",
      "lon" : "121.383328"
    }
  }
]

它是一个由4个元素组成的数组。 因此,当我们索引数据时,如果我们正确地索引它,那么数据的长度是4,我们将证明它。

在实际的应用中,这些数据可能来自实时数据或数据等各种数据源。

把样本数据加入进来进行indexing

所有的数据在加入到Elasticsearch中都需要进行一项indexing的工作。我们继续修改我们的index.js文件如下:

const express = require('express');
var elasticsearch = require('elasticsearch');
const fs = require('fs');
const app = express();

const PORT = 5000;

const client = new elasticsearch.Client({
    host: '127.0.0.1:9200',
    log: 'error'
});

client.ping({ requestTimeout: 30000 }, function(error) {
    if (error) {
        console.error('elasticsearch cluster is down!');
    } else {
        console.log('Everything is ok');
    }
});


const bulkIndex = function bulkIndex(index, type, data) {
    let bulkBody = [];
    data.forEach(item => {
    bulkBody.push({
        index: {
            _index: index,
            _type: type,
            _id: item.id
        }
    });
 
    bulkBody.push(item);
});
 
client.bulk({body: bulkBody})
    .then(response => {
        let errorCount = 0;
        response.items.forEach(item => {
            if (item.index && item.index.error) {
                console.log(++errorCount, item.index.error);
            }
        });
        console.log(
            `Successfully indexed ${data.length - errorCount}
     out of ${data.length} items`
        );
    })
    .catch(console.err);
  };

async function indexData() {
    const twittersRaw = await fs.readFileSync('./data.json');
    const twitters = JSON.parse(twittersRaw);
    console.log(`${twitters.length} items parsed from data file`);
    bulkIndex('twitter2', '_doc', twitters);
};

indexData();

app.listen(PORT, function() {
    console.log('Server is running on PORT:',PORT);
});

这里,我们可以看到在indexData()方法里,我们首先通过readFileSync来把我们的data.json中的数据读入进来。通过JSON.parse把数据转化为Javascript的object。最终,我们通过bulkIndex来引用elasticsearch的API来完成数据的录入。就像我们之前所说的那样,这个数据在实际的应用中可能来自其它的种类的数据源。如果大家想了解什么是bulk API,可以参照我之前的教程 “开始使用Elasticsearch (1)”。它可以很快地将很多文档在一起通过一次的REST接口调用,并完成数据的indexing。

验证录入的数据

在上面的一节中,我们已经通过bulk API接口把我们的数据录入到系统中了。接下来我们看看如何通过elasticsearch的方法来把我们的录入的数据读出来。我们在当前目录创建一个新的文件verify.js。

// verify.js

const elasticsearch = require('elasticsearch');

const client = new elasticsearch.Client({
    host: '127.0.0.1:9200',
    log: 'error'
 });


function indices() {
    return client.cat.indices({v: true})
    .then(console.log)
    .catch(err => console.error(`Error connecting to the es client: ${err}`));
  }

module.exports = function verify() {
    console.log(`elasticsearch indices information:`);
    indices();
}

在这里,我们通过client.cat.indices接口来显示所有目前的index。为了使用刚才建立的verify.js文件,我们需要更进一步修改我们之前的index.js:

const express = require('express');
var elasticsearch = require('elasticsearch');
const fs = require('fs');
const app = express();

const PORT = 5000;
const verify = require('./verify');

const client = new elasticsearch.Client({
    host: '127.0.0.1:9200',
    log: 'error'
});

client.ping({ requestTimeout: 30000 }, function(error) {
    if (error) {
        console.error('elasticsearch cluster is down!');
    } else {
        console.log('Everything is ok');
    }
});


const bulkIndex = function bulkIndex(index, type, data) {
    let bulkBody = [];
data.forEach(item => {
    bulkBody.push({
        index: {
            _index: index,
            _type: type,
            _id: item.id
        }
    });
 
    bulkBody.push(item);
});
 
client.bulk({body: bulkBody})
    .then(response => {
        let errorCount = 0;
        response.items.forEach(item => {
            if (item.index && item.index.error) {
                console.log(++errorCount, item.index.error);
            }
        });
        console.log(
            `Successfully indexed ${data.length - errorCount}
     out of ${data.length} items`
        );
    })
    .catch(console.err);
    };

async function indexData() {
    const twittersRaw = await fs.readFileSync('./data.json');
    const twitters = JSON.parse(twittersRaw);
    console.log(`${twitters.length} items parsed from data file`);
    bulkIndex('twitter2', '_doc', twitters);
};

indexData();
verify();

app.listen(PORT, function() {
    console.log('Server is running on PORT:',PORT);
});

注意前面的

const verify = require('./verify');

及后面的verify()调用

...
indexData();
verify();
...

我们重新运行我们的index.js。显示的结果如下:

img

我们可以看到有4个文档被导入到Elasticsearch,并有一个叫做twitter2的index被创建。这意味着我们已经成功地把数据录入到Elasticsearch中。在接下来的章节中,我们来搜索我们其中的数据。

搜索Elasticsearch索引中的数据

首先,我们来搜索所有的twitter文档。我们在项目的目录中创建一个新的文件叫做search.js。它的内容如下:

// search.js

const elasticsearch = require('elasticsearch');

const client = new elasticsearch.Client({
    host: '127.0.0.1:9200',
    log: 'error'
});

const search = function search(index, body) {
    return client.search({index: index, body: body});
};

module.exports =  function searchData() {
    let body = {
        size: 4,
        from: 0,
        query: {
            match_all: {}
        }
    };
search('twitter2', body)
    .then(results => {
        console.log(`found ${results.hits.total} items in ${results.took}ms`);
        console.log(`returned twitters:`);
        results.hits.hits.forEach(
            (hit, index) => console.log(
                hit._source
            )
        )
    })
    .catch(console.error);
};

这是一个崭新的模块。因为我们已经为我们的数据进行了索引,我只需要把数据从Elasticsearch中读出来。我们对search()方法传入了两个参数:

index名称 查询的DSL body 如果大家对DSL不是很熟的话,请参阅我之前的教程“开始使用Elasticsearch (2)”。这里的body含有一个我们需要返回的size已经进行查询所需要的query语句。

为了使得我们的search.js能够被使用,我们更进一步修改我们的index.js文件:

const express = require('express');
var elasticsearch = require('elasticsearch');
const fs = require('fs');
const app = express();

const PORT = 5000;
const verify = require('./verify');
const searchData = require('./search');

const client = new elasticsearch.Client({
    host: '127.0.0.1:9200',
    log: 'error'
});

client.ping({ requestTimeout: 30000 }, function(error) {
    if (error) {
        console.error('elasticsearch cluster is down!');
    } else {
        console.log('Everything is ok');
    }
});


const bulkIndex = function bulkIndex(index, type, data) {
    let bulkBody = [];
data.forEach(item => {
    bulkBody.push({
        index: {
            _index: index,
            _type: type,
            _id: item.id
        }
    });

    bulkBody.push(item);
});

client.bulk({body: bulkBody})
    .then(response => {
        let errorCount = 0;
        response.items.forEach(item => {
            if (item.index && item.index.error) {
                console.log(++errorCount, item.index.error);
            }
        });
        console.log(
            `Successfully indexed ${data.length - errorCount}
     out of ${data.length} items`
        );
    })
    .catch(console.err);
    };

async function indexData() {
    const twittersRaw = await fs.readFileSync('./data.json');
    const twitters = JSON.parse(twittersRaw);
    console.log(`${twitters.length} items parsed from data file`);
    bulkIndex('twitter2', '_doc', twitters);
};

// indexData();
// verify();
searchData();

app.listen(PORT, function() {
    console.log('Server is running on PORT:',PORT);
});

请注意文件开头的部分:

const searchData = require('./search');

及后面的verify()调用

...
// indexData();
// verify();
searchData();
...

这次运行时,我们只做search的动作,所以把之前的index及verify部分代码注释掉。存下我们的代码,并重新运行我们的应用:

returned twitters:
{
  id: '2',
  user: '双榆树-张三',
  message: '今儿天气不错啊,出去转转去',
  uid: 2,
  age: 20,
  city: '北京',
  province: '北京',
  country: '中国',
  address: '中国北京市海淀区',
  location: { lat: '39.970718', lon: '116.325747' }
}
{
  id: '3',
  user: '东城区-老刘',
  message: '出发,下一站云南!',
  uid: 3,
  age: 30,
  city: '北京',
  province: '北京',
  country: '中国',
  address: '中国北京市东城区台基厂三条3号',
  location: { lat: '39.904313', lon: '116.412754' }
}
{
  id: '4',
  user: '东城区-李四',
  message: 'happy birthday!',
  uid: 4,
  age: 30,
  city: '北京',
  province: '北京',
  country: '中国',
  address: '中国北京市东城区',
  location: { lat: '39.893801', lon: '116.408986' }
}
{
  id: '7',
  user: '虹桥-老吴',
  message: '好友来了都今天我生日,好友来了,什么 birthday happy 就成!',
  uid: 7,
  age: 90,
  city: '上海',
  province: '上海',
  country: '中国',
  address: '中国上海市闵行区',
  location: { lat: '31.175927', lon: '121.383328' }
}

搜索特定术语

我们在项目的根目录下创建一个新的文件search_term.js,并且加入如下的代码:

// search_term.js

const elasticsearch = require('elasticsearch');

const client = new elasticsearch.Client({
    host: '127.0.0.1:9200',
    log: 'error'
});

const search = function search(index, body) {
    return client.search({index: index, body: body});
};

module.exports = function searchTerm() {
    let body = {
        size: 4,
        from: 0,
        query: {
            match: {
                city: {
                    query: '北京'
                }
            }
        }
    };
console.log(`retrieving documents whose twitter matches '${body.query.match.city.query}' (displaying ${body.size} items at a time)...`);
search('twitter2', body)
    .then(results => {
        console.log(`found ${results.hits.total} items in ${results.took}ms`);
        if (results.hits.total > 0) console.log(`returned twitters:`);
        results.hits.hits.forEach(hit => console.log(hit._source));
    })
    .catch(console.error);
    };

在这里,我们使用如下的query:

query: {
        match: {
          city: {
            query: '北京'
          }
        }
      }

我寻找所有城市是“北京”的所有的文档。

同样,我们需要对我们的index.js做相应的改动:

const express = require('express');
var elasticsearch = require('elasticsearch');
const fs = require('fs');
const app = express();

const PORT = 5000;
const verify = require('./verify');
const searchData = require('./search');
const searctTerm = require('./search_term');

const client = new elasticsearch.Client({
    host: '127.0.0.1:9200',
    log: 'error'
});

client.ping({ requestTimeout: 30000 }, function(error) {
    if (error) {
        console.error('elasticsearch cluster is down!');
    } else {
        console.log('Everything is ok');
    }
});


const bulkIndex = function bulkIndex(index, type, data) {
    let bulkBody = [];
data.forEach(item => {
    bulkBody.push({
        index: {
            _index: index,
            _type: type,
            _id: item.id
        }
    });

    bulkBody.push(item);
});

client.bulk({body: bulkBody})
    .then(response => {
        let errorCount = 0;
        response.items.forEach(item => {
            if (item.index && item.index.error) {
                console.log(++errorCount, item.index.error);
            }
        });
        console.log(
            `Successfully indexed ${data.length - errorCount}
     out of ${data.length} items`
        );
    })
    .catch(console.err);
    };

async function indexData() {
    const twittersRaw = await fs.readFileSync('./data.json');
    const twitters = JSON.parse(twittersRaw);
    console.log(`${twitters.length} items parsed from data file`);
    bulkIndex('twitter2', '_doc', twitters);
};

// indexData();
// verify();
// searchData();
searctTerm();

app.listen(PORT, function() {
    console.log('Server is running on PORT:',PORT);
});

请注意在index.js的前面的这行代码:

const searctTerm = require('./search_term');

以及后面的这个部分:

...
// indexData();
// verify();
// searchData();
searctTerm();
...

运行我们的应用,我们可以看到如下的结果:

Found [object Object] items in 3ms
{
  id: '2',
  user: '双榆树-张三',
  message: '今儿天气不错啊,出去转转去',
  uid: 2,
  age: 20,
  city: '北京',
  province: '北京',
  country: '中国',
  address: '中国北京市海淀区',
  location: { lat: '39.970718', lon: '116.325747' }
}
{
  id: '3',
  user: '东城区-老刘',
  message: '出发,下一站云南!',
  uid: 3,
  age: 30,
  city: '北京',
  province: '北京',
  country: '中国',
  address: '中国北京市东城区台基厂三条3号',
  location: { lat: '39.904313', lon: '116.412754' }
}
{
  id: '4',
  user: '东城区-李四',
  message: 'happy birthday!',
  uid: 4,
  age: 30,
  city: '北京',
  province: '北京',
  country: '中国',
  address: '中国北京市东城区',
  location: { lat: '39.893801', lon: '116.408986' }
}

在这里我们看到city为北京的所有的文档。其中的一个上海的文档是不在这里面。

到这里,我们已经完成了所有的这个教程。如果你想了解更多关于Elastic Stack的知识,请参阅我们的官方网站:https://www.elastic.co/guide/index.html

这个教程的源码可以在地址找到:https://github.com/liu-xiao-guo/elasticsearch-js

版权声明:本文为CSDN博主「Elastic 中国社区官方博客」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/UbuntuTouch/article/details/100112283

这些信息有用吗?
Do you have any suggestions for improvement?

Thanks for your feedback!