mirror of
https://github.com/node-red/node-red-nodes.git
synced 2025-03-01 10:37:43 +00:00
Update 27-twitter.js
Update the public search and user look up parts to work with twitter API V2.0
This commit is contained in:
parent
5aa99eca73
commit
6aea983855
@ -1,4 +1,4 @@
|
|||||||
|
//new updated code on Oct 3rd
|
||||||
module.exports = function (RED) {
|
module.exports = function (RED) {
|
||||||
"use strict";
|
"use strict";
|
||||||
var Ntwitter = require('twitter-ng');
|
var Ntwitter = require('twitter-ng');
|
||||||
@ -12,6 +12,16 @@ module.exports = function(RED) {
|
|||||||
var userObjectCache = {};
|
var userObjectCache = {};
|
||||||
var userSreenNameToIdCache = {};
|
var userSreenNameToIdCache = {};
|
||||||
|
|
||||||
|
RED.nodes.registerType("twitter-credentials", TwitterCredentialsNode, {
|
||||||
|
credentials: {
|
||||||
|
consumer_key: { type: "password" },
|
||||||
|
consumer_secret: { type: "password" },
|
||||||
|
access_token: { type: "password" },
|
||||||
|
access_token_secret: { type: "password" },
|
||||||
|
access_token_bearer: { type: "password" }
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
function TwitterCredentialsNode(n) {
|
function TwitterCredentialsNode(n) {
|
||||||
RED.nodes.createNode(this, n);
|
RED.nodes.createNode(this, n);
|
||||||
this.screen_name = n.screen_name;
|
this.screen_name = n.screen_name;
|
||||||
@ -21,42 +31,81 @@ module.exports = function(RED) {
|
|||||||
if (this.credentials.consumer_key &&
|
if (this.credentials.consumer_key &&
|
||||||
this.credentials.consumer_secret &&
|
this.credentials.consumer_secret &&
|
||||||
this.credentials.access_token &&
|
this.credentials.access_token &&
|
||||||
this.credentials.access_token_secret) {
|
this.credentials.access_token_secret &&
|
||||||
|
this.credentials.access_token_bearer) {
|
||||||
|
|
||||||
this.oauth = {
|
this.oauth = {
|
||||||
consumer_key: this.credentials.consumer_key,
|
consumer_key: this.credentials.consumer_key,
|
||||||
consumer_secret: this.credentials.consumer_secret,
|
consumer_secret: this.credentials.consumer_secret,
|
||||||
token: this.credentials.access_token,
|
token: this.credentials.access_token,
|
||||||
token_secret: this.credentials.access_token_secret
|
token_secret: this.credentials.access_token_secret,
|
||||||
|
token_bearer: this.credentials.access_token_bearer
|
||||||
}
|
}
|
||||||
this.credHash = crypto.createHash('sha1').update(
|
this.credHash = crypto.createHash('sha1').update(
|
||||||
this.credentials.consumer_key + this.credentials.consumer_secret +
|
this.credentials.consumer_key + this.credentials.consumer_secret +
|
||||||
this.credentials.access_token + this.credentials.access_token_secret
|
this.credentials.access_token + this.credentials.access_token_secret
|
||||||
|
+ this.credentials.access_token_bearer
|
||||||
).digest('base64');
|
).digest('base64');
|
||||||
var self = this;
|
var self = this;
|
||||||
if (localUserCache.hasOwnProperty(self.credHash)) {
|
|
||||||
this.localIdentityPromise = Promise.resolve(localUserCache[self.credHash]);
|
const needle = require('needle');
|
||||||
} else {
|
var credentials = RED.nodes.getCredentials(self);
|
||||||
this.localIdentityPromise = this.get("https://api.twitter.com/1.1/account/settings.json").then(function(body) {
|
|
||||||
if (body.status === 200) {
|
// The code below sets the bearer token from your environment variables
|
||||||
localUserCache[self.credHash] = body.body.screen_name;
|
// To set environment variables on macOS or Linux, run the export command below from the terminal:
|
||||||
self.screen_name = body.body.screen_name;
|
// export BEARER_TOKEN='YOUR-TOKEN'
|
||||||
} else {
|
|
||||||
self.warn("Failed to get user profile");
|
const token = this.credentials.access_token_bearer;
|
||||||
|
|
||||||
|
const endpointUserURL = "https://api.twitter.com/2/users/by?usernames="
|
||||||
|
|
||||||
|
async function getRequest() {
|
||||||
|
|
||||||
|
// These are the parameters for the API request
|
||||||
|
// specify User names to fetch, and any additional fields that are required
|
||||||
|
// by default, only the User ID, name and user name are returned
|
||||||
|
|
||||||
|
|
||||||
|
const params = {
|
||||||
|
usernames: `${self.screen_name}`, // Edit usernames to look up
|
||||||
|
"user.fields": "created_at,description", // Edit optional query parameters here
|
||||||
|
"expansions": "pinned_tweet_id"
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
console.log(self.screen_name);
|
||||||
|
|
||||||
|
// this is the HTTP header that adds bearer token authentication
|
||||||
|
const res = await needle('get', endpointUserURL, params, {
|
||||||
|
headers: {
|
||||||
|
"authorization": `Bearer ${token}`
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if (res.statusCode === 200) {
|
||||||
|
return res.body;
|
||||||
|
} else {
|
||||||
|
node.send("Failed to get user profile");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
RED.nodes.registerType("twitter-credentials",TwitterCredentialsNode,{
|
(async () => {
|
||||||
credentials: {
|
|
||||||
consumer_key: { type: "password"},
|
try {
|
||||||
consumer_secret: { type: "password" },
|
// Make request
|
||||||
access_token: {type: "password"},
|
const response = await getRequest();
|
||||||
access_token_secret: {type:"password"}
|
// console.dir(response, {
|
||||||
|
// depth: null
|
||||||
|
// });
|
||||||
|
|
||||||
|
} catch (e) {
|
||||||
|
console.log(e);
|
||||||
}
|
}
|
||||||
});
|
})();
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
TwitterCredentialsNode.prototype.get = function (url, opts) {
|
TwitterCredentialsNode.prototype.get = function (url, opts) {
|
||||||
var node = this;
|
var node = this;
|
||||||
opts = opts || {};
|
opts = opts || {};
|
||||||
@ -169,7 +218,6 @@ module.exports = function(RED) {
|
|||||||
this.twitterConfig = RED.nodes.getNode(this.twitter);
|
this.twitterConfig = RED.nodes.getNode(this.twitter);
|
||||||
this.poll_ids = [];
|
this.poll_ids = [];
|
||||||
this.timeout_ids = [];
|
this.timeout_ids = [];
|
||||||
|
|
||||||
var credentials = RED.nodes.getCredentials(this.twitter);
|
var credentials = RED.nodes.getCredentials(this.twitter);
|
||||||
this.status({});
|
this.status({});
|
||||||
|
|
||||||
@ -198,55 +246,148 @@ module.exports = function(RED) {
|
|||||||
consumer_key: credentials.consumer_key,
|
consumer_key: credentials.consumer_key,
|
||||||
consumer_secret: credentials.consumer_secret,
|
consumer_secret: credentials.consumer_secret,
|
||||||
access_token_key: credentials.access_token,
|
access_token_key: credentials.access_token,
|
||||||
access_token_secret: credentials.access_token_secret
|
access_token_secret: credentials.access_token_secret,
|
||||||
|
access_token_bearer: credentials.access_token_bearer
|
||||||
});
|
});
|
||||||
|
|
||||||
// Stream public tweets
|
// Stream public tweets
|
||||||
try {
|
|
||||||
var thing = 'statuses/filter';
|
|
||||||
var tags = node.tags;
|
|
||||||
var st = { track: [tags] };
|
|
||||||
|
|
||||||
var setupStream = function() {
|
const needle = require('needle');
|
||||||
if (node.restart) {
|
|
||||||
node.status({fill:"green", shape:"dot", text:(tags||" ")});
|
// The code below sets the bearer token from your environment variables
|
||||||
twit.stream(thing, st, function(stream) {
|
// To set environment variables on macOS or Linux, run the export command below from the terminal:
|
||||||
//console.log("ST",st);
|
// export BEARER_TOKEN='YOUR-TOKEN'
|
||||||
|
const token = credentials.access_token_bearer;
|
||||||
|
|
||||||
|
const rulesURL = 'https://api.twitter.com/2/tweets/search/stream/rules';
|
||||||
|
const streamURL = 'https://api.twitter.com/2/tweets/search/stream';
|
||||||
|
|
||||||
|
// this sets up two rules - the value is the search terms to match on, and the tag is an identifier that
|
||||||
|
// will be applied to the Tweets return to show which rule they matched
|
||||||
|
// with a standard project with Basic Access, you can add up to 25 concurrent rules to your stream, and
|
||||||
|
// each rule can be up to 512 characters long
|
||||||
|
|
||||||
|
// Edit rules as desired below
|
||||||
|
const rules = [{
|
||||||
|
'value': node.tags,
|
||||||
|
'tag': node.tags
|
||||||
|
}];
|
||||||
|
|
||||||
|
async function getAllRules() {
|
||||||
|
|
||||||
|
const response = await needle('get', rulesURL, {
|
||||||
|
headers: {
|
||||||
|
"authorization": `Bearer ${token}`
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if (response.statusCode !== 200) {
|
||||||
|
node.send("Error:", response.statusMessage, response.statusCode)
|
||||||
|
throw new Error(response.body);
|
||||||
|
}
|
||||||
|
|
||||||
|
return (response.body);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function deleteAllRules(rules) {
|
||||||
|
|
||||||
|
if (!Array.isArray(rules.data)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const ids = rules.data.map(rule => rule.id);
|
||||||
|
|
||||||
|
const data = {
|
||||||
|
"delete": {
|
||||||
|
"ids": ids
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const response = await needle('post', rulesURL, data, {
|
||||||
|
headers: {
|
||||||
|
"content-type": "application/json",
|
||||||
|
"authorization": `Bearer ${token}`
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if (response.statusCode !== 200) {
|
||||||
|
throw new Error(response.body);
|
||||||
|
}
|
||||||
|
|
||||||
|
return (response.body);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
async function setRules() {
|
||||||
|
|
||||||
|
const data = {
|
||||||
|
"add": rules
|
||||||
|
}
|
||||||
|
|
||||||
|
const response = await needle('post', rulesURL, data, {
|
||||||
|
headers: {
|
||||||
|
"content-type": "application/json",
|
||||||
|
"authorization": `Bearer ${token}`
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if (response.statusCode !== 201) {
|
||||||
|
throw new Error(response.body);
|
||||||
|
}
|
||||||
|
|
||||||
|
return (response.body);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
function streamConnect(retryAttempt) {
|
||||||
|
var flag = false;
|
||||||
|
const stream = needle.get(streamURL, {
|
||||||
|
headers: {
|
||||||
|
"User-Agent": "v2FilterStreamJS",
|
||||||
|
"Authorization": `Bearer ${token}`
|
||||||
|
},
|
||||||
|
timeout: 20000
|
||||||
|
});
|
||||||
node.stream = stream;
|
node.stream = stream;
|
||||||
stream.on('data', function(tweet) {
|
|
||||||
if (tweet.user !== undefined) {
|
stream.on('data', data => {
|
||||||
var where = tweet.user.location;
|
try {
|
||||||
var la = tweet.lang || tweet.user.lang;
|
const json = JSON.parse(data);
|
||||||
var msg = { topic:node.topic+"/"+tweet.user.screen_name, payload:tweet.text, lang:la, tweet:tweet };
|
// console.log(json);
|
||||||
if (where) {
|
node.status({ fill: "green", shape: "dot", text: (tags || " ") });
|
||||||
msg.location = {place:where};
|
node.send({ topic: "tweet", payload: json.data.text });
|
||||||
addLocationToTweet(msg);
|
|
||||||
|
// A successful connection resets retry count.
|
||||||
|
retryAttempt = 0;
|
||||||
|
} catch (e) {
|
||||||
|
if (data.detail === "This stream is currently at the maximum allowed connection limit.") {
|
||||||
|
console.log(data.detail)
|
||||||
|
process.exit(1)
|
||||||
|
} else {
|
||||||
|
// Keep alive signal received. Do nothing.
|
||||||
}
|
}
|
||||||
node.send(msg);
|
|
||||||
//node.status({fill:"green", shape:"dot", text:(tags||" ")});
|
|
||||||
}
|
}
|
||||||
});
|
}).on('err', error => {
|
||||||
stream.on('limit', function(tweet) {
|
if (error.code !== 'ECONNRESET') {
|
||||||
//node.status({fill:"grey", shape:"dot", text:RED._("twitter.errors.limitrate")});
|
console.log(error.code);
|
||||||
node.status({fill:"grey", shape:"dot", text:(tags||" ")});
|
process.exit(1);
|
||||||
node.tout2 = setTimeout(function() { node.status({fill:"green", shape:"dot", text:(tags||" ")}); },10000);
|
} else {
|
||||||
});
|
// This reconnection logic will attempt to reconnect when a disconnection is detected.
|
||||||
stream.on('error', function(tweet,rc) {
|
// To avoid rate limits, this logic implements exponential backoff, so the wait time
|
||||||
//console.log("ERRO",rc,tweet);
|
// will increase if the client cannot reconnect to the stream.
|
||||||
if (rc == 420) {
|
setTimeout(() => {
|
||||||
node.status({fill:"red", shape:"ring", text:RED._("twitter.errors.ratelimit")});
|
console.warn("A connection error occurred. Reconnecting...")
|
||||||
|
streamConnect(++retryAttempt);
|
||||||
|
node.status({ fill: "red", shape: "ring", text: RED._("twitter.errors") });
|
||||||
|
}, 2 ** retryAttempt)
|
||||||
}
|
}
|
||||||
else {
|
|
||||||
node.status({fill:"red", shape:"ring", text:tweet.toString()});
|
|
||||||
node.warn(RED._("twitter.errors.streamerror",{error:tweet.toString(),rc:rc}));
|
|
||||||
}
|
|
||||||
twitterRateTimeout = Date.now() + retry;
|
|
||||||
if (node.restart) {
|
if (node.restart) {
|
||||||
node.tout = setTimeout(function () { setupStream() }, retry);
|
node.tout = setTimeout(function () { setupStream() }, retry);
|
||||||
}
|
}
|
||||||
});
|
}).on('limit', limit => {
|
||||||
stream.on('destroy', function (response) {
|
//node.status({fill:"grey", shape:"dot", text:RED._("twitter.errors.limitrate")});
|
||||||
//console.log("DEST",response)
|
node.status({ fill: "grey", shape: "dot", text: (tags || " ") });
|
||||||
|
}).on('destroy', function (response) {
|
||||||
twitterRateTimeout = Date.now() + 15000;
|
twitterRateTimeout = Date.now() + 15000;
|
||||||
if (node.restart) {
|
if (node.restart) {
|
||||||
node.status({ fill: "red", shape: "dot", text: " " });
|
node.status({ fill: "red", shape: "dot", text: " " });
|
||||||
@ -254,16 +395,92 @@ module.exports = function(RED) {
|
|||||||
node.tout = setTimeout(function () { setupStream() }, 15000);
|
node.tout = setTimeout(function () { setupStream() }, 15000);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
|
||||||
}
|
return stream;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// if 4 numeric tags that look like a geo area then set geo area
|
try {
|
||||||
var bits = node.tags.split(",");
|
var tags = node.tags;
|
||||||
if (bits.length == 4) {
|
var st = { track: [tags] };
|
||||||
if ((Number(bits[0]) < Number(bits[2])) && (Number(bits[1]) < Number(bits[3]))) {
|
var setupStream = function () {
|
||||||
st = { locations: node.tags };
|
if (node.restart) {
|
||||||
node.log(RED._("twitter.status.using-geo",{location:node.tags.toString()}));
|
(async () => {
|
||||||
|
let currentRules;
|
||||||
|
console.warn = ({ topic: "warning", payload: node.restart })
|
||||||
|
try {
|
||||||
|
// Gets the complete list of rules currently applied to the stream
|
||||||
|
currentRules = await getAllRules();
|
||||||
|
// Delete all rules. Comment the line below if you want to keep your existing rules.
|
||||||
|
await deleteAllRules(currentRules);
|
||||||
|
// Add rules to the stream. Comment the line below if you don't want to add new rules.
|
||||||
|
await setRules();
|
||||||
|
} catch (e) {
|
||||||
|
console.error(e);
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
// Listen to the stream.
|
||||||
|
// node.status({fill:"green", shape:"dot", text:(node.tags||" ")});
|
||||||
|
console.log("Twitter API is steraming public tweets with search term " + node.tags || " ");
|
||||||
|
|
||||||
|
var flag = false;
|
||||||
|
const stream = needle.get(streamURL, {
|
||||||
|
headers: {
|
||||||
|
"User-Agent": "v2FilterStreamJS",
|
||||||
|
"Authorization": `Bearer ${token}`
|
||||||
|
},
|
||||||
|
timeout: 20000
|
||||||
|
});
|
||||||
|
node.stream = stream;
|
||||||
|
|
||||||
|
stream.on('data', data => {
|
||||||
|
try {
|
||||||
|
const json = JSON.parse(data);
|
||||||
|
// console.log(json);
|
||||||
|
node.status({ fill: "green", shape: "dot", text: (tags || " ") });
|
||||||
|
var msg = { topic: "tweet", payload: json.data.text };
|
||||||
|
node.send(msg);
|
||||||
|
|
||||||
|
// A successful connection resets retry count.
|
||||||
|
retryAttempt = 0;
|
||||||
|
} catch (e) {
|
||||||
|
if (data.detail === "This stream is currently at the maximum allowed connection limit.") {
|
||||||
|
console.log(data.detail)
|
||||||
|
// process.exit(1)
|
||||||
|
} else {
|
||||||
|
// Keep alive signal received. Do nothing.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).on('err', error => {
|
||||||
|
if (error.code !== 'ECONNRESET') {
|
||||||
|
console.log(error.code);
|
||||||
|
// process.exit(1);
|
||||||
|
} else {
|
||||||
|
// This reconnection logic will attempt to reconnect when a disconnection is detected.
|
||||||
|
// To avoid rate limits, this logic implements exponential backoff, so the wait time
|
||||||
|
// will increase if the client cannot reconnect to the stream.
|
||||||
|
setTimeout(() => {
|
||||||
|
console.warn("A connection error occurred. Reconnecting...")
|
||||||
|
streamConnect(++retryAttempt);
|
||||||
|
node.status({ fill: "red", shape: "ring", text: RED._("twitter.errors") });
|
||||||
|
}, 2 ** retryAttempt)
|
||||||
|
}
|
||||||
|
if (node.restart) {
|
||||||
|
node.tout = setTimeout(function () { setupStream() }, retry);
|
||||||
|
}
|
||||||
|
}).on('limit', limit => {
|
||||||
|
//node.status({fill:"grey", shape:"dot", text:RED._("twitter.errors.limitrate")});
|
||||||
|
node.status({ fill: "grey", shape: "dot", text: (tags || " ") });
|
||||||
|
}).on('destroy', function (response) {
|
||||||
|
twitterRateTimeout = Date.now() + 15000;
|
||||||
|
if (node.restart) {
|
||||||
|
node.status({ fill: "red", shape: "dot", text: " " });
|
||||||
|
node.warn(RED._("twitter.errors.unexpectedend"));
|
||||||
|
node.tout = setTimeout(function () { setupStream() }, 15000);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
})();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -276,7 +493,7 @@ module.exports = function(RED) {
|
|||||||
if (this.stream) {
|
if (this.stream) {
|
||||||
this.restart = false;
|
this.restart = false;
|
||||||
node.stream.removeAllListeners();
|
node.stream.removeAllListeners();
|
||||||
this.stream.destroy();
|
this.stream.request.abort();
|
||||||
}
|
}
|
||||||
if ((typeof msg.payload === "string") && (msg.payload !== "")) {
|
if ((typeof msg.payload === "string") && (msg.payload !== "")) {
|
||||||
st = { track: [msg.payload] };
|
st = { track: [msg.payload] };
|
||||||
@ -318,8 +535,8 @@ module.exports = function(RED) {
|
|||||||
if (this.tout2) { clearTimeout(this.tout2); }
|
if (this.tout2) { clearTimeout(this.tout2); }
|
||||||
if (this.stream) {
|
if (this.stream) {
|
||||||
this.restart = false;
|
this.restart = false;
|
||||||
this.stream.removeAllListeners();
|
node.stream.removeAllListeners();
|
||||||
this.stream.destroy();
|
this.stream.request.abort();
|
||||||
}
|
}
|
||||||
if (this.timeout_ids) {
|
if (this.timeout_ids) {
|
||||||
for (var i = 0; i < this.timeout_ids.length; i++) {
|
for (var i = 0; i < this.timeout_ids.length; i++) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user